SSF Tools - Cache Service Architecture¶
Overview¶
NOTE: This is a first draft meant to capture ideas about what a caching service might look like. It's likely that some or even most of the features on this might not be implemented -- at least as part of MVP. Only the following will be implemented in MVP:
1. Cache services for downloading wordlists for credential scanning / word-list enabled features in entropy analyze (using the HTTP Cache category)
2. The cache will use platformdirs to place the downloaded wordlists in an OS-specific folder as managed by the platformdirs package.
3. Files will be cached for 168 hours (1 week) before checking for new content (using the HTTPClient service)
4. CLI commands will be implemneted to show, clear, cleanup and list available categories. Only the HTTP category will be implemented at this time.
The Cache Service provides a unified, protocol-based approach to caching across all SSF Tools commands. This architecture enables consistent cache management while maintaining loose coupling through dependency injection patterns and supporting multiple cache categories with proper lifecycle management.
Architectural Principles¶
Design Goals¶
- Unified Management: Single cache service for all SSF Tools components
- Category-Based Organization: Logical separation of different cache types
- Dependency Injection: Clean service boundaries with protocol contracts
- Configuration Integration: Centralized cache settings in global configuration
- CLI Management: Complete command-line interface for cache operations
- Performance Optimization: Efficient storage and retrieval mechanisms
- Lifecycle Management: Automatic cleanup and expiration handling
Key Benefits¶
- Consistency: All commands use the same caching strategy
- Monitoring: Centralized cache statistics and management
- Testability: Easy to mock cache service for testing
- Maintenance: Unified cache cleanup and management operations
- Performance: Shared cache reduces duplication across commands
Architecture Overview¶
Cache Service Protocols¶
The cache architecture is built around protocols that define the contracts for cache management.
CacheServiceProtocol¶
This protocol defines the interface for unified cache management:
from pathlib import Path
from typing import Protocol, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
class CacheCategory(str, Enum):
"""Standard cache categories across SSF Tools."""
HTTP_RESPONSES = "http_responses"
WORDLISTS = "wordlists"
FILE_HASHES = "file_hashes"
ENCODING_DETECTION = "encoding_detection"
MIME_DETECTION = "mime_detection"
ENTROPY_THRESHOLDS = "entropy_thresholds"
TIMESTAMP_FILENAMES = "timestamp_filenames"
VOLATILITY_PROFILES = "volatility_profiles"
class CacheInfo(BaseModel):
"""Cache information and statistics."""
total_size_bytes: int
total_items: int
categories: dict[str, CategoryInfo]
base_cache_dir: Path
last_cleanup: datetime | None
class CategoryInfo(BaseModel):
"""Information about a specific cache category."""
size_bytes: int
item_count: int
oldest_item: datetime | None
newest_item: datetime | None
ttl_hours: int
class CacheServiceProtocol(Protocol):
"""Protocol for unified cache management across SSF Tools."""
def get_cache_info(self) -> CacheInfo:
"""Get comprehensive cache statistics and information."""
def clear_cache(self, category: CacheCategory | None = None) -> int:
"""Clear cache by category or all. Returns items cleared."""
def get_cache_size(self, category: CacheCategory | None = None) -> int:
"""Get cache size in bytes."""
def cleanup_expired(self, max_age: timedelta | None = None) -> int:
"""Remove expired cache entries. Returns items removed."""
def list_cache_categories(self) -> list[CacheCategory]:
"""List available cache categories."""
def get_cache_path(self, category: CacheCategory) -> Path:
"""Get the cache directory path for a category."""
def is_cache_enabled(self, category: CacheCategory) -> bool:
"""Check if caching is enabled for a category."""
def get_category_config(self, category: CacheCategory) -> CategoryConfig:
"""Get configuration for a specific cache category."""
Configuration Models¶
Cache Configuration¶
class CategoryConfig(BaseModel):
"""Configuration for a specific cache category."""
enabled: bool = True
ttl_hours: int = 168 # 1 week default
max_size_mb: int = 100
cleanup_threshold: float = 0.8 # Cleanup when 80% full
class CacheConfig(BaseModel):
"""Cache service configuration."""
base_cache_dir: Path = Path("~/.cache/ssf_tools")
max_total_size_mb: int = 1024
default_ttl_hours: int = 168 # 1 week
cleanup_interval_hours: int = 24
categories: dict[CacheCategory, CategoryConfig] = Field(default_factory=dict)
@field_validator('base_cache_dir')
@classmethod
def expand_cache_dir(cls, v: Path) -> Path:
"""Expand user directory and resolve path."""
return Path(v).expanduser().resolve()
Global Configuration Integration¶
# In ssf-tools-config.yaml
global:
cache_dir: "~/.cache/ssf_tools" # Base cache directory
cache_max_size_mb: 1024 # Total cache size limit
cache_ttl_hours: 168 # Default TTL (1 week)
cache_cleanup_interval_hours: 24 # Auto-cleanup frequency
cache_categories:
wordlists:
enabled: true
ttl_hours: 8760 # 1 year for wordlists
max_size_mb: 500
http_responses:
enabled: true
ttl_hours: 24 # 1 day for HTTP responses
max_size_mb: 100
file_hashes:
enabled: true
ttl_hours: 168 # 1 week for file hashes
max_size_mb: 50
encoding_detection:
enabled: true
ttl_hours: 72 # 3 days for encoding detection
max_size_mb: 10
mime_detection:
enabled: true
ttl_hours: 72 # 3 days for MIME detection
max_size_mb: 10
entropy_thresholds:
enabled: true
ttl_hours: 720 # 30 days for thresholds
max_size_mb: 5
timestamp_filenames:
enabled: true
ttl_hours: 1 # 1 hour for timestamp cache
max_size_mb: 1
volatility_profiles:
enabled: true
ttl_hours: 2160 # 90 days for profiles
max_size_mb: 200
Service Implementation¶
Core Cache Service¶
class CacheService:
"""Unified cache service implementation."""
def __init__(
self,
config: CacheConfig,
output: RichOutputProtocol,
):
self._config = config
self._output = output
self._ensure_cache_directories()
def _ensure_cache_directories(self) -> None:
"""Create cache directories for all categories."""
for category in CacheCategory:
cache_path = self.get_cache_path(category)
cache_path.mkdir(parents=True, exist_ok=True)
def get_cache_path(self, category: CacheCategory) -> Path:
"""Get the cache directory path for a category."""
return self._config.base_cache_dir / category.value
def get_cache_info(self) -> CacheInfo:
"""Get comprehensive cache statistics."""
categories = {}
total_size = 0
total_items = 0
for category in CacheCategory:
category_info = self._get_category_info(category)
categories[category.value] = category_info
total_size += category_info.size_bytes
total_items += category_info.item_count
return CacheInfo(
total_size_bytes=total_size,
total_items=total_items,
categories=categories,
base_cache_dir=self._config.base_cache_dir,
last_cleanup=self._get_last_cleanup_time()
)
Container Integration¶
Core Container Registration¶
# In src/kp_ssf_tools/containers/core.py
class CoreContainer(containers.DeclarativeContainer):
"""Container for core infrastructure services."""
# Configuration injection
config = providers.Configuration()
# Cache service configuration
cache_config: providers.Factory[CacheConfig] = providers.Factory(
CacheConfig,
base_cache_dir=config.global.cache_dir,
max_total_size_mb=config.global.cache_max_size_mb.as_(int),
default_ttl_hours=config.global.cache_ttl_hours.as_(int),
cleanup_interval_hours=config.global.cache_cleanup_interval_hours.as_(int),
categories=config.global.cache_categories,
)
# Unified cache service
cache: providers.Singleton[CacheService] = providers.Singleton(
CacheService,
config=cache_config,
output=rich_output,
)
# Core services aggregate for convenience
core_services = providers.Aggregate(
output=rich_output,
http=http_client,
timestamp=timestamp,
cache=cache, # Add cache to core services
# ... other services
)
CLI Integration¶
Cache Management Commands¶
# src/kp_ssf_tools/cli/commands/cache.py
import click
from dependency_injector.wiring import inject, Provide
from kp_ssf_tools.containers import ApplicationContainer
@click.group()
def cache():
"""Cache management commands for SSF Tools."""
pass
@click.command()
@inject
def show(
cache_service=Provide[ApplicationContainer.core.cache],
output=Provide[ApplicationContainer.core.rich_output],
):
"""Show cache information and statistics."""
try:
cache_info = cache_service.get_cache_info()
# Display cache overview
output.info(f"Cache Directory: {cache_info.base_cache_dir}")
output.info(f"Total Size: {cache_info.total_size_bytes / 1024 / 1024:.1f} MB")
output.info(f"Total Items: {cache_info.total_items:,}")
# Display category breakdown
if cache_info.categories:
table_data = []
for category, info in cache_info.categories.items():
table_data.append([
category,
f"{info.size_bytes / 1024 / 1024:.1f} MB",
f"{info.item_count:,}",
f"{info.ttl_hours}h",
])
output.table(
table_data,
headers=["Category", "Size", "Items", "TTL"],
title="Cache Categories"
)
except Exception as e:
output.error(f"Failed to get cache information: {e}")
raise click.ClickException(str(e))
@click.command()
@click.option("--category", type=click.Choice([c.value for c in CacheCategory]),
help="Clear specific cache category")
@click.option("--older-than", help="Clear entries older than specified time (e.g., '30d', '1w', '24h')")
@click.option("--force", is_flag=True, help="Skip confirmation prompt")
@inject
def clear(
category: str | None,
older_than: str | None,
force: bool,
cache_service=Provide[ApplicationContainer.core.cache],
output=Provide[ApplicationContainer.core.rich_output],
):
"""Clear cache entries."""
try:
if not force:
if category:
message = f"Clear cache category '{category}'?"
elif older_than:
message = f"Clear cache entries older than {older_than}?"
else:
message = "Clear entire cache?"
if not output.confirm(message):
output.info("Cache clear cancelled.")
return
if older_than:
max_age = parse_time_delta(older_than)
cleared = cache_service.cleanup_expired(max_age)
else:
cache_category = CacheCategory(category) if category else None
cleared = cache_service.clear_cache(cache_category)
output.success(f"Cleared {cleared} cache entries.")
except Exception as e:
output.error(f"Failed to clear cache: {e}")
raise click.ClickException(str(e))
@click.command()
@inject
def cleanup(
cache_service=Provide[ApplicationContainer.core.cache],
output=Provide[ApplicationContainer.core.rich_output],
):
"""Remove expired cache entries."""
try:
removed = cache_service.cleanup_expired()
output.success(f"Removed {removed} expired cache entries.")
except Exception as e:
output.error(f"Failed to cleanup cache: {e}")
raise click.ClickException(str(e))
@click.command()
@inject
def categories(
cache_service=Provide[ApplicationContainer.core.cache],
output=Provide[ApplicationContainer.core.rich_output],
):
"""List available cache categories."""
try:
categories = cache_service.list_cache_categories()
for category in categories:
config = cache_service.get_category_config(category)
enabled_status = "✓" if config.enabled else "✗"
output.info(f"{enabled_status} {category.value} (TTL: {config.ttl_hours}h)")
except Exception as e:
output.error(f"Failed to list cache categories: {e}")
raise click.ClickException(str(e))
# Register commands
cache.add_command(show)
cache.add_command(clear)
cache.add_command(cleanup)
cache.add_command(categories)
CLI Command Registration¶
# In src/kp_ssf_tools/cli/main.py
def register_commands() -> None:
"""Register all CLI commands."""
from kp_ssf_tools.cli.commands.volatility import volatility
from kp_ssf_tools.cli.commands.entropy import entropy
from kp_ssf_tools.cli.commands.config import config
from kp_ssf_tools.cli.commands.cache import cache # Add cache commands
cli.add_command(volatility)
cli.add_command(entropy)
cli.add_command(config)
cli.add_command(cache) # Register cache commands
Service Integration Patterns¶
HTTP Client Integration¶
class HttpClientService:
"""HTTP client with integrated caching."""
def __init__(
self,
config: HttpConfig,
cache: CacheServiceProtocol,
output: RichOutputProtocol,
):
self._config = config
self._cache = cache
self._output = output
async def get(self, url: str, use_cache: bool = True) -> HttpResponse:
"""Make GET request with caching support."""
if use_cache and self._cache.is_cache_enabled(CacheCategory.HTTP_RESPONSES):
cached_response = self._get_cached_response(url)
if cached_response:
return cached_response
response = await self._make_request(url)
if use_cache and response.is_success:
self._cache_response(url, response)
return response
Entropy Service Integration¶
class WordlistManager:
"""Wordlist management with caching."""
def __init__(
self,
cache: CacheServiceProtocol,
http_client: HttpClientProtocol,
output: RichOutputProtocol,
):
self._cache = cache
self._http = http_client
self._output = output
def ensure_wordlists_available(self, force_refresh: bool = False) -> None:
"""Ensure wordlists are cached and up-to-date."""
if force_refresh:
self._cache.clear_cache(CacheCategory.WORDLISTS)
wordlist_path = self._cache.get_cache_path(CacheCategory.WORDLISTS)
if not self._wordlists_exist(wordlist_path):
self._download_wordlists(wordlist_path)
Usage Examples¶
Basic Cache Operations¶
# Show cache information
ssf_tools cache show
# Clear all cache
ssf_tools cache clear
# Clear specific category
ssf_tools cache clear --category wordlists
# Clear old entries
ssf_tools cache clear --older-than 30d
# Remove expired entries
ssf_tools cache cleanup
# List categories
ssf_tools cache categories
Service Usage in Commands¶
@inject
def entropy_analyze(
target: Path,
cache_service=Provide[ApplicationContainer.core.cache],
analyzer=Provide[ApplicationContainer.entropy.analyzer],
):
"""Entropy analysis with cache integration."""
# Check if we need to refresh wordlists cache
if cache_service.is_cache_enabled(CacheCategory.WORDLISTS):
wordlist_path = cache_service.get_cache_path(CacheCategory.WORDLISTS)
if not wordlist_path.exists():
# Trigger wordlist download
pass
# Perform analysis with cached thresholds
result = analyzer.analyze_target(target)
return result
Performance Considerations¶
Caching Strategy¶
- Hot Data: Frequently accessed items kept in memory with LRU eviction
- Cold Storage: Less frequent items stored on disk with metadata indexing
- Lazy Loading: Cache directories created on-demand
Storage Optimization¶
- Compression: Large cache items (wordlists, profiles) stored compressed
- Deduplication: Hash-based deduplication for identical cache entries
- Metadata Indexing: Fast lookup without scanning directory contents
- Automatic Cleanup: Background cleanup based on TTL and size limits
Memory Management¶
- Bounded Memory: In-memory cache with configurable size limits
- Streaming: Large cached items streamed rather than loaded entirely
- Resource Cleanup: Automatic cleanup of resources through context managers
- Memory Monitoring: Cache size monitoring with automatic eviction
Testing Patterns¶
Mock Cache Service¶
@pytest.fixture
def mock_cache_service():
"""Mock cache service for testing."""
mock_cache = Mock(spec=CacheServiceProtocol)
mock_cache.get_cache_info.return_value = CacheInfo(
total_size_bytes=0,
total_items=0,
categories={},
base_cache_dir=Path("/tmp/test-cache"),
last_cleanup=None
)
return mock_cache
def test_command_with_cache(mock_cache_service):
"""Test command with mocked cache service."""
# Test logic with mocked cache
pass
Integration Testing¶
def test_cache_service_integration():
"""Test cache service with real filesystem."""
with tempfile.TemporaryDirectory() as temp_dir:
config = CacheConfig(base_cache_dir=Path(temp_dir))
cache_service = CacheService(config, mock_output)
# Test cache operations
info = cache_service.get_cache_info()
assert info.total_items == 0
Implementation Roadmap¶
Phase 1: Core Cache Service¶
- [ ] Implement basic
CacheServiceandCacheServiceProtocol - [ ] Create cache configuration models
- [ ] Add cache service to
CoreContainer - [ ] Implement basic CLI commands (
show,clear,cleanup)
Phase 2: Service Integration¶
- [ ] Integrate cache service with HTTP client
- [ ] Migrate entropy wordlist caching to unified service
- [ ] Update file processing services to use cache
- [ ] Add cache support to timestamp service
Phase 3: Advanced Features¶
- [ ] Implement in-memory hot cache with LRU eviction
- [ ] Add compression for large cache items
- [ ] Implement automatic background cleanup
- [ ] Add cache metrics and monitoring
Phase 4: Optimization¶
- [ ] Add deduplication for cache entries
- [ ] Add cache validation and integrity checking
- [ ] Performance optimization and benchmarking
This cache service architecture provides a robust, unified foundation for all caching needs across SSF Tools while maintaining clean separation of concerns and excellent testability.