You are building a persistent key-value storage system for a distributed data platform. The system must survive crashes and restarts while handling concurrent access from multiple threads. This is critical infrastructure that powers data pipelines and analytics workloads.
The key challenges are:
` ┌─────────────────────────────────────────┐ │ DurableKVStore │ ├─────────────────────────────────────────┤ │ In-Memory Map (HashMap) │ │ + ReadWriteLock │ └──────────┬─────────────────┬────────────┘ │ │ ┌──────▼──────┐ ┌─────▼──────────┐ │ WAL File │ │ Snapshot File │ │ (append) │ │ (periodic) │ └─────────────┘ └────────────────┘
Write Path: WAL → flush → Memory Recovery: Snapshot → WAL replay → Memory `
Hint: WAL-First Ordering
Hint: Separate Locks for Performance Use separate locks for WAL and in-memory store to maximize concurrency:
` self.store = {} # In-memory storage self.store_lock = ReadWriteLock() # Multiple readers OR single writer self.wal_lock = Lock() # Serializes WAL writes only
Read path (high concurrency)
def get(self, key: str): self.store_lock.acquire_read() try: return self.store.get(key) finally: self.store_lock.release_read() `
Benefits:
- Multiple concurrent readers (ReadWriteLock)
- Writes don't block reads (separate locks)
- 10x+ performance improvement for read-heavy workloads
Hint: File Formats WAL Format:
PUT|timestamp|key|value DELETE|timestamp|keySnapshot Format:
SNAPSHOT|timestamp key1|value1 key2|value2During recovery:
- Load snapshot (if exists)
- Replay WAL entries with timestamp > snapshot timestamp
- This gives you the complete state
Full Solution (Python) `` Time complexity:
get(key): O(1) averageput(key, value): O(1) average + disk I/Odelete(key): O(1) average + disk I/Ocreate_snapshot(): O(N) where N is number of keys- Recovery: O(N + W) where W is WAL entries
Space complexity: O(N) for in-memory storage plus O(W) for WAL
Critical design decisions:
- WAL-first ordering ensures durability
- ReadWriteLock allows multiple concurrent readers (9.5x improvement for read-heavy workloads)
- Separate locks prevent WAL writes from blocking reads
- Timestamp filtering during recovery avoids replaying already-snapshotted entries
import threading
from typing import Optional
class DurableKVStore:
"""
Thread-safe, persistent key-value store with WAL and snapshots.
This is a simplified in-memory implementation for testing.
A production version would write WAL and snapshots to actual files.
"""
def __init__(self, data_directory: str):
"""
Initialize store with directory for WAL and snapshots.
Args:
data_directory: Directory path for storing WAL and snapshot files
"""
pass
def put(self, key: str, value: str):
"""
Store a key-value pair (durable).
Args:
key: The key to store
value: The value to associate with the key
"""
pass
def get(self, key: str) -> Optional[str]:
"""
Retrieve value for a key.
Args:
key: The key to look up
Returns:
The value associated with the key, or None if not found
"""
pass
def delete(self, key: str):
"""
Remove a key from the store.
Args:
key: The key to delete
"""
pass
def create_snapshot(self):
"""
Create snapshot and compact WAL.
Captures current state of the store to enable WAL compaction.
"""
pass
def close(self):
"""
Gracefully close the store.
"""
pass
# --- Test harness (do not modify) ---
def test_basic_put_get():
import tempfile
import os
store = DurableKVStore(tempfile.mkdtemp())
store.put('key1', 'value1')
result = store.get('key1')
store.close()
return str(result) if result is not None else "None"
def test_update_key():
import tempfile
store = DurableKVStore(tempfile.mkdtemp())
store.put('key1', 'old')
store.put('key1', 'new')
result = store.get('key1')
store.close()
return str(result) if result is not None else "None"
def test_delete_key():
import tempfile
store = DurableKVStore(tempfile.mkdtemp())
store.put('key1', 'value1')
store.delete('key1')
result = store.get('key1')
store.close()
return str(result) if result is not None else "None"
def test_get_nonexistent():
import tempfile
store = DurableKVStore(tempfile.mkdtemp())
result = store.get('nonexistent')
store.close()
return str(result) if result is not None else "None"
def test_multiple_keys():
import tempfile
import json
store = DurableKVStore(tempfile.mkdtemp())
store.put('a', '1')
store.put('b', '2')
store.put('c', '3')
result = {'a': store.get('a'), 'b': store.get('b'), 'c': store.get('c')}
store.close()
return json.dumps(result, sort_keys=True)
def test_snapshot_recovery():
import tempfile
import json
store = DurableKVStore(tempfile.mkdtemp())
store.put('key1', 'value1')
store.create_snapshot()
store.put('key2', 'value2')
result = {'key1': store.get('key1'), 'key2': store.get('key2')}
store.close()
return json.dumps(result, sort_keys=True)
# --- Run tests ---
if __name__ == "__main__":
tests = [
{"name": "Basic put and get", "run": test_basic_put_get, "expected": "value1"},
{"name": "Update existing key", "run": test_update_key, "expected": "new"},
{"name": "Delete key", "run": test_delete_key, "expected": "None"},
{"name": "Get non-existent key", "run": test_get_nonexistent, "expected": "None"},
{"name": "Multiple keys", "run": test_multiple_keys, "expected": '{"a": "1", "b": "2", "c": "3"}'},
{"name": "Snapshot and recovery", "run": test_snapshot_recovery, "expected": '{"key1": "value1", "key2": "value2"}'},
]
passed = 0
for t in tests:
try:
result = t["run"]()
except Exception as e:
result = f"ERROR: {e}"
ok = result == t["expected"]
passed += ok
status = "PASS" if ok else "FAIL"
print(f" {status} {t['name']}")
if not ok:
print(f" expected: {t['expected']}")
print(f" got: {result}")
print(f"\n{passed}/{len(tests)} tests passed")