Skip to content

SSF Tools - Cache Service Architecture

Overview

NOTE: This is a first draft meant to capture ideas about what a caching service might look like. It's likely that some or even most of the features on this might not be implemented -- at least as part of MVP. Only the following will be implemented in MVP: 1. Cache services for downloading wordlists for credential scanning / word-list enabled features in entropy analyze (using the HTTP Cache category) 2. The cache will use platformdirs to place the downloaded wordlists in an OS-specific folder as managed by the platformdirs package. 3. Files will be cached for 168 hours (1 week) before checking for new content (using the HTTPClient service) 4. CLI commands will be implemneted to show, clear, cleanup and list available categories. Only the HTTP category will be implemented at this time.

The Cache Service provides a unified, protocol-based approach to caching across all SSF Tools commands. This architecture enables consistent cache management while maintaining loose coupling through dependency injection patterns and supporting multiple cache categories with proper lifecycle management.

Architectural Principles

Design Goals

  • Unified Management: Single cache service for all SSF Tools components
  • Category-Based Organization: Logical separation of different cache types
  • Dependency Injection: Clean service boundaries with protocol contracts
  • Configuration Integration: Centralized cache settings in global configuration
  • CLI Management: Complete command-line interface for cache operations
  • Performance Optimization: Efficient storage and retrieval mechanisms
  • Lifecycle Management: Automatic cleanup and expiration handling

Key Benefits

  • Consistency: All commands use the same caching strategy
  • Monitoring: Centralized cache statistics and management
  • Testability: Easy to mock cache service for testing
  • Maintenance: Unified cache cleanup and management operations
  • Performance: Shared cache reduces duplication across commands

Architecture Overview

graph TD %% Core Cache Components CC[Core Container] --> CS[CacheService] CC --> CONF[CacheConfig Factory] CC --> RO[RichOutputService] CONF --> CS RO --> CS %% Cache Categories subgraph CATEGORIES ["🗂️ Cache Categories"] HTTP[HTTP_RESPONSES<br/>API responses & downloads] WL[WORDLISTS<br/>SecLists & security wordlists] FH[FILE_HASHES<br/>File hash computations] ENC[ENCODING_DETECTION<br/>Character encoding results] MIME[MIME_DETECTION<br/>File type detection] ET[ENTROPY_THRESHOLDS<br/>Threshold model cache] TS[TIMESTAMP_FILENAMES<br/>Filename timestamp cache] VP[VOLATILITY_PROFILES<br/>Memory analysis profiles] end %% Service Dependencies CS --> CATEGORIES %% Command Integration subgraph COMMANDS ["🛠️ Command Integration"] ENTROPY[Entropy Analysis<br/>Uses: WORDLISTS, ET, FH] VOLATILITY[Volatility Analysis<br/>Uses: VP, FH] HTTP_CLIENT[HTTP Client<br/>Uses: HTTP] FILE_PROC[File Processing<br/>Uses: ENC, MIME, FH] end COMMANDS --> CS %% CLI Interface CLI["CLI Cache Commands<br/>cache show|clear|cleanup|categories"] CLI --> CS %% Storage Backend subgraph STORAGE ["💾 Storage Backend"] FS[File System<br/>Category-based directories] MEM[In-Memory<br/>LRU cache for hot data] META[Metadata<br/>Timestamps, TTL, sizes] end CS --> STORAGE %% Styling classDef coreService fill:#e1f5fe,stroke:#0277bd,stroke-width:2px classDef config fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px classDef category fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px classDef command fill:#fff3e0,stroke:#ef6c00,stroke-width:2px classDef cli fill:#e3f2fd,stroke:#1565c0,stroke-width:2px classDef storage fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef group fill:#f8f9fa,stroke:#6c757d,stroke-width:3px,stroke-dasharray: 5 5 class CC,CS coreService class CONF,RO config class HTTP,WL,FH,ENC,MIME,ET,TS,VP category class ENTROPY,VOLATILITY,HTTP_CLIENT,FILE_PROC command class CLI cli class FS,MEM,META storage class CATEGORIES,COMMANDS,STORAGE group

Cache Service Protocols

The cache architecture is built around protocols that define the contracts for cache management.

CacheServiceProtocol

This protocol defines the interface for unified cache management:

from pathlib import Path
from typing import Protocol, Any, Optional
from datetime import datetime, timedelta
from enum import Enum

class CacheCategory(str, Enum):
    """Standard cache categories across SSF Tools."""
    HTTP_RESPONSES = "http_responses"
    WORDLISTS = "wordlists" 
    FILE_HASHES = "file_hashes"
    ENCODING_DETECTION = "encoding_detection"
    MIME_DETECTION = "mime_detection"
    ENTROPY_THRESHOLDS = "entropy_thresholds"
    TIMESTAMP_FILENAMES = "timestamp_filenames"
    VOLATILITY_PROFILES = "volatility_profiles"

class CacheInfo(BaseModel):
    """Cache information and statistics."""
    total_size_bytes: int
    total_items: int
    categories: dict[str, CategoryInfo]
    base_cache_dir: Path
    last_cleanup: datetime | None

class CategoryInfo(BaseModel):
    """Information about a specific cache category."""
    size_bytes: int
    item_count: int
    oldest_item: datetime | None
    newest_item: datetime | None
    ttl_hours: int

class CacheServiceProtocol(Protocol):
    """Protocol for unified cache management across SSF Tools."""

    def get_cache_info(self) -> CacheInfo:
        """Get comprehensive cache statistics and information."""

    def clear_cache(self, category: CacheCategory | None = None) -> int:
        """Clear cache by category or all. Returns items cleared."""

    def get_cache_size(self, category: CacheCategory | None = None) -> int:
        """Get cache size in bytes."""

    def cleanup_expired(self, max_age: timedelta | None = None) -> int:
        """Remove expired cache entries. Returns items removed."""

    def list_cache_categories(self) -> list[CacheCategory]:
        """List available cache categories."""

    def get_cache_path(self, category: CacheCategory) -> Path:
        """Get the cache directory path for a category."""

    def is_cache_enabled(self, category: CacheCategory) -> bool:
        """Check if caching is enabled for a category."""

    def get_category_config(self, category: CacheCategory) -> CategoryConfig:
        """Get configuration for a specific cache category."""

Configuration Models

Cache Configuration

class CategoryConfig(BaseModel):
    """Configuration for a specific cache category."""
    enabled: bool = True
    ttl_hours: int = 168  # 1 week default
    max_size_mb: int = 100
    cleanup_threshold: float = 0.8  # Cleanup when 80% full

class CacheConfig(BaseModel):
    """Cache service configuration."""
    base_cache_dir: Path = Path("~/.cache/ssf_tools")
    max_total_size_mb: int = 1024
    default_ttl_hours: int = 168  # 1 week
    cleanup_interval_hours: int = 24
    categories: dict[CacheCategory, CategoryConfig] = Field(default_factory=dict)

    @field_validator('base_cache_dir')
    @classmethod
    def expand_cache_dir(cls, v: Path) -> Path:
        """Expand user directory and resolve path."""
        return Path(v).expanduser().resolve()

Global Configuration Integration

# In ssf-tools-config.yaml
global:
  cache_dir: "~/.cache/ssf_tools"        # Base cache directory
  cache_max_size_mb: 1024                # Total cache size limit
  cache_ttl_hours: 168                   # Default TTL (1 week)
  cache_cleanup_interval_hours: 24       # Auto-cleanup frequency
  cache_categories:
    wordlists:
      enabled: true
      ttl_hours: 8760                    # 1 year for wordlists
      max_size_mb: 500
    http_responses:
      enabled: true
      ttl_hours: 24                      # 1 day for HTTP responses
      max_size_mb: 100
    file_hashes:
      enabled: true
      ttl_hours: 168                     # 1 week for file hashes
      max_size_mb: 50
    encoding_detection:
      enabled: true
      ttl_hours: 72                      # 3 days for encoding detection
      max_size_mb: 10
    mime_detection:
      enabled: true
      ttl_hours: 72                      # 3 days for MIME detection
      max_size_mb: 10
    entropy_thresholds:
      enabled: true
      ttl_hours: 720                     # 30 days for thresholds
      max_size_mb: 5
    timestamp_filenames:
      enabled: true
      ttl_hours: 1                       # 1 hour for timestamp cache
      max_size_mb: 1
    volatility_profiles:
      enabled: true
      ttl_hours: 2160                    # 90 days for profiles
      max_size_mb: 200

Service Implementation

Core Cache Service

class CacheService:
    """Unified cache service implementation."""

    def __init__(
        self,
        config: CacheConfig,
        output: RichOutputProtocol,
    ):
        self._config = config
        self._output = output
        self._ensure_cache_directories()

    def _ensure_cache_directories(self) -> None:
        """Create cache directories for all categories."""
        for category in CacheCategory:
            cache_path = self.get_cache_path(category)
            cache_path.mkdir(parents=True, exist_ok=True)

    def get_cache_path(self, category: CacheCategory) -> Path:
        """Get the cache directory path for a category."""
        return self._config.base_cache_dir / category.value

    def get_cache_info(self) -> CacheInfo:
        """Get comprehensive cache statistics."""
        categories = {}
        total_size = 0
        total_items = 0

        for category in CacheCategory:
            category_info = self._get_category_info(category)
            categories[category.value] = category_info
            total_size += category_info.size_bytes
            total_items += category_info.item_count

        return CacheInfo(
            total_size_bytes=total_size,
            total_items=total_items,
            categories=categories,
            base_cache_dir=self._config.base_cache_dir,
            last_cleanup=self._get_last_cleanup_time()
        )

Container Integration

Core Container Registration

# In src/kp_ssf_tools/containers/core.py
class CoreContainer(containers.DeclarativeContainer):
    """Container for core infrastructure services."""

    # Configuration injection
    config = providers.Configuration()

    # Cache service configuration
    cache_config: providers.Factory[CacheConfig] = providers.Factory(
        CacheConfig,
        base_cache_dir=config.global.cache_dir,
        max_total_size_mb=config.global.cache_max_size_mb.as_(int),
        default_ttl_hours=config.global.cache_ttl_hours.as_(int),
        cleanup_interval_hours=config.global.cache_cleanup_interval_hours.as_(int),
        categories=config.global.cache_categories,
    )

    # Unified cache service
    cache: providers.Singleton[CacheService] = providers.Singleton(
        CacheService,
        config=cache_config,
        output=rich_output,
    )

    # Core services aggregate for convenience
    core_services = providers.Aggregate(
        output=rich_output,
        http=http_client,
        timestamp=timestamp,
        cache=cache,  # Add cache to core services
        # ... other services
    )

CLI Integration

Cache Management Commands

# src/kp_ssf_tools/cli/commands/cache.py
import click
from dependency_injector.wiring import inject, Provide
from kp_ssf_tools.containers import ApplicationContainer

@click.group()
def cache():
    """Cache management commands for SSF Tools."""
    pass

@click.command()
@inject
def show(
    cache_service=Provide[ApplicationContainer.core.cache],
    output=Provide[ApplicationContainer.core.rich_output],
):
    """Show cache information and statistics."""
    try:
        cache_info = cache_service.get_cache_info()

        # Display cache overview
        output.info(f"Cache Directory: {cache_info.base_cache_dir}")
        output.info(f"Total Size: {cache_info.total_size_bytes / 1024 / 1024:.1f} MB")
        output.info(f"Total Items: {cache_info.total_items:,}")

        # Display category breakdown
        if cache_info.categories:
            table_data = []
            for category, info in cache_info.categories.items():
                table_data.append([
                    category,
                    f"{info.size_bytes / 1024 / 1024:.1f} MB",
                    f"{info.item_count:,}",
                    f"{info.ttl_hours}h",
                ])

            output.table(
                table_data,
                headers=["Category", "Size", "Items", "TTL"],
                title="Cache Categories"
            )

    except Exception as e:
        output.error(f"Failed to get cache information: {e}")
        raise click.ClickException(str(e))

@click.command()
@click.option("--category", type=click.Choice([c.value for c in CacheCategory]),
              help="Clear specific cache category")
@click.option("--older-than", help="Clear entries older than specified time (e.g., '30d', '1w', '24h')")
@click.option("--force", is_flag=True, help="Skip confirmation prompt")
@inject
def clear(
    category: str | None,
    older_than: str | None,
    force: bool,
    cache_service=Provide[ApplicationContainer.core.cache],
    output=Provide[ApplicationContainer.core.rich_output],
):
    """Clear cache entries."""
    try:
        if not force:
            if category:
                message = f"Clear cache category '{category}'?"
            elif older_than:
                message = f"Clear cache entries older than {older_than}?"
            else:
                message = "Clear entire cache?"

            if not output.confirm(message):
                output.info("Cache clear cancelled.")
                return

        if older_than:
            max_age = parse_time_delta(older_than)
            cleared = cache_service.cleanup_expired(max_age)
        else:
            cache_category = CacheCategory(category) if category else None
            cleared = cache_service.clear_cache(cache_category)

        output.success(f"Cleared {cleared} cache entries.")

    except Exception as e:
        output.error(f"Failed to clear cache: {e}")
        raise click.ClickException(str(e))

@click.command()
@inject
def cleanup(
    cache_service=Provide[ApplicationContainer.core.cache],
    output=Provide[ApplicationContainer.core.rich_output],
):
    """Remove expired cache entries."""
    try:
        removed = cache_service.cleanup_expired()
        output.success(f"Removed {removed} expired cache entries.")

    except Exception as e:
        output.error(f"Failed to cleanup cache: {e}")
        raise click.ClickException(str(e))

@click.command()
@inject
def categories(
    cache_service=Provide[ApplicationContainer.core.cache],
    output=Provide[ApplicationContainer.core.rich_output],
):
    """List available cache categories."""
    try:
        categories = cache_service.list_cache_categories()

        for category in categories:
            config = cache_service.get_category_config(category)
            enabled_status = "✓" if config.enabled else "✗"
            output.info(f"{enabled_status} {category.value} (TTL: {config.ttl_hours}h)")

    except Exception as e:
        output.error(f"Failed to list cache categories: {e}")
        raise click.ClickException(str(e))

# Register commands
cache.add_command(show)
cache.add_command(clear)
cache.add_command(cleanup)
cache.add_command(categories)

CLI Command Registration

# In src/kp_ssf_tools/cli/main.py
def register_commands() -> None:
    """Register all CLI commands."""
    from kp_ssf_tools.cli.commands.volatility import volatility
    from kp_ssf_tools.cli.commands.entropy import entropy
    from kp_ssf_tools.cli.commands.config import config
    from kp_ssf_tools.cli.commands.cache import cache  # Add cache commands

    cli.add_command(volatility)
    cli.add_command(entropy)
    cli.add_command(config)
    cli.add_command(cache)  # Register cache commands

Service Integration Patterns

HTTP Client Integration

class HttpClientService:
    """HTTP client with integrated caching."""

    def __init__(
        self,
        config: HttpConfig,
        cache: CacheServiceProtocol,
        output: RichOutputProtocol,
    ):
        self._config = config
        self._cache = cache
        self._output = output

    async def get(self, url: str, use_cache: bool = True) -> HttpResponse:
        """Make GET request with caching support."""
        if use_cache and self._cache.is_cache_enabled(CacheCategory.HTTP_RESPONSES):
            cached_response = self._get_cached_response(url)
            if cached_response:
                return cached_response

        response = await self._make_request(url)

        if use_cache and response.is_success:
            self._cache_response(url, response)

        return response

Entropy Service Integration

class WordlistManager:
    """Wordlist management with caching."""

    def __init__(
        self,
        cache: CacheServiceProtocol,
        http_client: HttpClientProtocol,
        output: RichOutputProtocol,
    ):
        self._cache = cache
        self._http = http_client
        self._output = output

    def ensure_wordlists_available(self, force_refresh: bool = False) -> None:
        """Ensure wordlists are cached and up-to-date."""
        if force_refresh:
            self._cache.clear_cache(CacheCategory.WORDLISTS)

        wordlist_path = self._cache.get_cache_path(CacheCategory.WORDLISTS)
        if not self._wordlists_exist(wordlist_path):
            self._download_wordlists(wordlist_path)

Usage Examples

Basic Cache Operations

# Show cache information
ssf_tools cache show

# Clear all cache
ssf_tools cache clear

# Clear specific category
ssf_tools cache clear --category wordlists

# Clear old entries
ssf_tools cache clear --older-than 30d

# Remove expired entries
ssf_tools cache cleanup

# List categories
ssf_tools cache categories

Service Usage in Commands

@inject
def entropy_analyze(
    target: Path,
    cache_service=Provide[ApplicationContainer.core.cache],
    analyzer=Provide[ApplicationContainer.entropy.analyzer],
):
    """Entropy analysis with cache integration."""

    # Check if we need to refresh wordlists cache
    if cache_service.is_cache_enabled(CacheCategory.WORDLISTS):
        wordlist_path = cache_service.get_cache_path(CacheCategory.WORDLISTS)
        if not wordlist_path.exists():
            # Trigger wordlist download
            pass

    # Perform analysis with cached thresholds
    result = analyzer.analyze_target(target)
    return result

Performance Considerations

Caching Strategy

  • Hot Data: Frequently accessed items kept in memory with LRU eviction
  • Cold Storage: Less frequent items stored on disk with metadata indexing
  • Lazy Loading: Cache directories created on-demand

Storage Optimization

  • Compression: Large cache items (wordlists, profiles) stored compressed
  • Deduplication: Hash-based deduplication for identical cache entries
  • Metadata Indexing: Fast lookup without scanning directory contents
  • Automatic Cleanup: Background cleanup based on TTL and size limits

Memory Management

  • Bounded Memory: In-memory cache with configurable size limits
  • Streaming: Large cached items streamed rather than loaded entirely
  • Resource Cleanup: Automatic cleanup of resources through context managers
  • Memory Monitoring: Cache size monitoring with automatic eviction

Testing Patterns

Mock Cache Service

@pytest.fixture
def mock_cache_service():
    """Mock cache service for testing."""
    mock_cache = Mock(spec=CacheServiceProtocol)
    mock_cache.get_cache_info.return_value = CacheInfo(
        total_size_bytes=0,
        total_items=0,
        categories={},
        base_cache_dir=Path("/tmp/test-cache"),
        last_cleanup=None
    )
    return mock_cache

def test_command_with_cache(mock_cache_service):
    """Test command with mocked cache service."""
    # Test logic with mocked cache
    pass

Integration Testing

def test_cache_service_integration():
    """Test cache service with real filesystem."""
    with tempfile.TemporaryDirectory() as temp_dir:
        config = CacheConfig(base_cache_dir=Path(temp_dir))
        cache_service = CacheService(config, mock_output)

        # Test cache operations
        info = cache_service.get_cache_info()
        assert info.total_items == 0

Implementation Roadmap

Phase 1: Core Cache Service

  • [ ] Implement basic CacheService and CacheServiceProtocol
  • [ ] Create cache configuration models
  • [ ] Add cache service to CoreContainer
  • [ ] Implement basic CLI commands (show, clear, cleanup)

Phase 2: Service Integration

  • [ ] Integrate cache service with HTTP client
  • [ ] Migrate entropy wordlist caching to unified service
  • [ ] Update file processing services to use cache
  • [ ] Add cache support to timestamp service

Phase 3: Advanced Features

  • [ ] Implement in-memory hot cache with LRU eviction
  • [ ] Add compression for large cache items
  • [ ] Implement automatic background cleanup
  • [ ] Add cache metrics and monitoring

Phase 4: Optimization

  • [ ] Add deduplication for cache entries
  • [ ] Add cache validation and integrity checking
  • [ ] Performance optimization and benchmarking

This cache service architecture provides a robust, unified foundation for all caching needs across SSF Tools while maintaining clean separation of concerns and excellent testability.