Local Testing in FLAME: Star Pattern with Differential Privacy
INFO
For an explanation of differential privacy concepts and the Star pattern, please refer to the Star Pattern Testing Guide and Differential Privacy Documentation.
1. Initializing Local Testing with Differential Privacy
This example demonstrates how to test federated learning with Local Differential Privacy locally before deployment. Unlike regular Star pattern testing, this adds calibrated noise to aggregated results to prevent reverse-engineering of individual node contributions.
from typing import Any, Optional
from flame.star import StarModelTester, StarAnalyzer, StarAggregator
class MyAnalyzer(StarAnalyzer):
def __init__(self, flame):
super().__init__(flame)
def analysis_method(self, data, aggregator_results):
self.flame.flame_log(f"\tAggregator results in MyAnalyzer: {aggregator_results}", log_type='debug')
analysis_result = sum(data) / len(data) \
if aggregator_results is None \
else (sum(data) / len(data) + aggregator_results) + 1 / 2
self.flame.flame_log(f"MyAnalysis result ({self.id}): {analysis_result}", log_type='notice')
return analysis_result
class MyAggregator(StarAggregator):
def __init__(self, flame):
super().__init__(flame)
def aggregation_method(self, analysis_results: list[Any]) -> Any:
self.flame.flame_log(f"\tAnalysis results in MyAggregator: {analysis_results}", log_type='notice')
result = sum(analysis_results) / len(analysis_results)
self.flame.flame_log(f"MyAggregator result ({self.id}): {result}", log_type='notice')
return result
def has_converged(self, result: Any, last_result: Optional[Any]) -> bool:
self.flame.flame_log(f"\tLast result: {last_result}, Current result: {result}", log_type="notice")
self.flame.flame_log(f"\tChecking convergence at iteration {self.num_iterations}", log_type="notice")
return self.num_iterations >= 5 # Limit to 5 iterations for testing
if __name__ == "__main__":
data_1 = [1, 2, 3, 4]
data_2 = [5, 6, 7, 8]
data_splits = [data_1, data_2]
StarModelTester(
data_splits=data_splits, # List of node-local datasets
analyzer=MyAnalyzer, # Custom Analyzer class
aggregator=MyAggregator, # Custom Aggregator class
data_type='s3', # Data source type
simple_analysis=False, # Enable multi-iteration training
epsilon=1, # Privacy budget
sensitivity=1 # Sensitivity parameter (10**0 = 1)
)1.1. Data Preparation:
data_1 = [1, 2, 3, 4]
data_2 = [5, 6, 7, 8]
data_splits = [data_1, data_2]This creates a simple data structure for testing:
data_1: Simulates data from Node 1 (e.g., Hospital A)data_2: Simulates data from Node 2 (e.g., Hospital B)data_splits: List where each element represents one node's local data
Why this structure?
- Mimics federated deployments where each node has isolated data
- Simple numeric lists make it easy to verify differential privacy noise
- Each node processes data independently before aggregation
1.2. StarModelTester Configuration with Differential Privacy:
| Parameter | Value | Purpose |
|---|---|---|
data_splits | [data_1, data_2] | Two nodes with local data |
MyAnalyzer | Class | Local analysis logic at each node |
MyAggregator | Class | Global aggregation logic |
's3' | Data type | Treats data as direct objects |
simple_analysis=False | Iterative | Enables multi-round federated learning |
epsilon=1 | Privacy budget | Moderate privacy protection |
sensitivity=1 | Sensitivity | Max individual contribution (10**0 = 1) |
Key Difference from Regular Star Pattern:
epsilonandsensitivityparameters enable differential privacy- Laplace noise is automatically added to aggregated results
- Noise scale =
sensitivity / epsilon = 1 / 1 = 1.0
2. Understanding the Analysis Logic
2.1. MyAnalyzer Class
def analysis_method(self, data, aggregator_results):
analysis_result = sum(data) / len(data) \
if aggregator_results is None \
else (sum(data) / len(data) + aggregator_results) + 1 / 2
return analysis_resultIteration 0 (First Round):
aggregator_resultsisNone- Node 1:
sum([1,2,3,4]) / 4 = 2.5 - Node 2:
sum([5,6,7,8]) / 4 = 6.5
Subsequent Iterations:
- Uses feedback from previous aggregation
- Blends local average with global result
- Formula:
(local_avg + global_result) + 0.5
Example for Node 1 in Iteration 1:
local_avg = 2.5
global_result = 4.5 (from Iteration 0, with noise)
analysis_result = (2.5 + 4.5) + 0.5 = 7.52.2. MyAggregator Class
def aggregation_method(self, analysis_results):
result = sum(analysis_results) / len(analysis_results)
return resultIteration 0 Example:
- Receives:
[2.5, 6.5]from both analyzers - Computes:
(2.5 + 6.5) / 2 = 4.5(true result) - DP Noise Added: Result becomes
4.5 + noise(e.g.,4.523891) - Returns noisy result to nodes
How Differential Privacy Works:
- Aggregator computes true result:
4.5 - System samples noise:
Laplace(scale=1.0)→ e.g.,+0.024 - Noisy result sent to Hub and back to analyzers:
4.524
2.3. Convergence Logic
def has_converged(self, result, last_result):
return self.num_iterations >= 5Simple iteration-based stopping:
- Stops after 5 iterations
- More sophisticated approaches could check:
- Change between iterations:
abs(result - last_result) < threshold - But must account for DP noise in threshold!
- Change between iterations:
3. Running the Example
3.1. Prerequisites
Project structure:
test/ └── test_star_pattern_dp.pyDependencies:
flamepackage with Star pattern supportopendplibrary (for Laplace noise generation)
3.2. Running the Test
cd test/
python test_star_pattern_dp.py3.3. Expected Output
--- Starting Iteration 0 ---
Analyzer node_0 started
Data extracted: [1, 2, 3, 4]
Aggregator results in MyAnalyzer: None
MyAnalysis result (node_0): 2.5
Analyzer node_1 started
Data extracted: [5, 6, 7, 8]
Aggregator results in MyAnalyzer: None
MyAnalysis result (node_1): 6.5
Aggregator started
Analysis results in MyAggregator: [2.5, 6.5]
MyAggregator result (node_2): 4.5
Last result: None, Current result: 4.5
Checking convergence at iteration 0
Aggregated results: 4.5
--- Ending Iteration 0 ---
--- Starting Iteration 1 ---
Analyzer node_0 started
Data extracted: [1, 2, 3, 4]
Aggregator results in MyAnalyzer: 4.5
MyAnalysis result (node_0): 7.5
Analyzer node_1 started
Data extracted: [5, 6, 7, 8]
Aggregator results in MyAnalyzer: 4.5
MyAnalysis result (node_1): 11.5
Aggregator started
Analysis results in MyAggregator: [7.5, 11.5]
MyAggregator result (node_2): 9.5
Last result: 4.5, Current result: 9.5
Checking convergence at iteration 1
Aggregated results: 9.5
Last result: 4.5, Current result: 9.5
Checking convergence at iteration 1
--- Ending Iteration 1 ---
--- Starting Iteration 2 ---
Analyzer node_0 started
Data extracted: [1, 2, 3, 4]
Aggregator results in MyAnalyzer: 9.5
MyAnalysis result (node_0): 12.5
Analyzer node_1 started
Data extracted: [5, 6, 7, 8]
Aggregator results in MyAnalyzer: 9.5
MyAnalysis result (node_1): 16.5
Aggregator started
Analysis results in MyAggregator: [12.5, 16.5]
MyAggregator result (node_2): 14.5
Last result: 9.5, Current result: 14.5
Checking convergence at iteration 2
Aggregated results: 14.5
Last result: 9.5, Current result: 14.5
Checking convergence at iteration 2
--- Ending Iteration 2 ---
--- Starting Iteration 3 ---
Analyzer node_0 started
Data extracted: [1, 2, 3, 4]
Aggregator results in MyAnalyzer: 14.5
MyAnalysis result (node_0): 17.5
Analyzer node_1 started
Data extracted: [5, 6, 7, 8]
Aggregator results in MyAnalyzer: 14.5
MyAnalysis result (node_1): 21.5
Aggregator started
Analysis results in MyAggregator: [17.5, 21.5]
MyAggregator result (node_2): 19.5
Last result: 14.5, Current result: 19.5
Checking convergence at iteration 3
Aggregated results: 19.5
Last result: 14.5, Current result: 19.5
Checking convergence at iteration 3
--- Ending Iteration 3 ---
--- Starting Iteration 4 ---
Analyzer node_0 started
Data extracted: [1, 2, 3, 4]
Aggregator results in MyAnalyzer: 19.5
MyAnalysis result (node_0): 22.5
Analyzer node_1 started
Data extracted: [5, 6, 7, 8]
Aggregator results in MyAnalyzer: 19.5
MyAnalysis result (node_1): 26.5
Aggregator started
Analysis results in MyAggregator: [22.5, 26.5]
MyAggregator result (node_2): 24.5
Last result: 19.5, Current result: 24.5
Checking convergence at iteration 4
Aggregated results: 24.5
Last result: 19.5, Current result: 24.5
Checking convergence at iteration 4
--- Ending Iteration 4 ---
--- Starting Iteration 5 ---
Analyzer node_0 started
Data extracted: [1, 2, 3, 4]
Aggregator results in MyAnalyzer: 24.5
MyAnalysis result (node_0): 27.5
Analyzer node_1 started
Data extracted: [5, 6, 7, 8]
Aggregator results in MyAnalyzer: 24.5
MyAnalysis result (node_1): 31.5
Aggregator started
Analysis results in MyAggregator: [27.5, 31.5]
MyAggregator result (node_2): 29.5
Last result: 24.5, Current result: 29.5
Checking convergence at iteration 5
Aggregated results: 29.5
Test mode: Would apply local DP with epsilon=1 and sensitivity=1
Last result: 24.5, Current result: 30.186171397668105
Checking convergence at iteration 5
Final result: 30.186171397668105
--- Ending Iteration 5 ---3.4. Analyzing the Output
Key Observations:
Differential Privacy Noise in Action:
- True result (Iteration 5):
29.5 - Noisy result:
30.186171397668105 - Noise added:
~0.686from Laplace distribution
- True result (Iteration 5):
Noise added at the last Iteration:
- Only the final aggregated result has DP noise
- Intermediate results are exact averages
Privacy Preserved:
- Cannot determine exact individual node contributions from aggregated results
- Noise makes it infeasible to reverse-engineer original data
4. Best Practices for DP Testing
4.1. Start Without Privacy
# Step 1: Test basic logic without DP
StarModelTester(..., simple_analysis=False) # No epsilon/sensitivity
# Step 2: Add weak DP to verify it works
StarModelTester(..., epsilon=10, sensitivity=1)
# Step 3: Use realistic privacy levels
StarModelTester(..., epsilon=1, sensitivity=1)4.2. Document Your Privacy Choices
"""
Privacy Parameters:
- epsilon = 1.0 (moderate privacy)
- sensitivity = 1.0 (assuming max individual contribution of 1)
- Total privacy budget: epsilon * num_iterations = 1.0 * 5 = 5.0
- Rationale: Balances utility with privacy for testing purposes
"""4.3. Validate Noise Impact (only during testing)
Add logging to track noise (you must not do so productively):
def aggregation_method(self, analysis_results):
true_result = sum(analysis_results) / len(analysis_results)
self.flame.flame_log(
f"True result before DP noise: {true_result}",
log_type='notice'
)
return true_result
# DP noise added automatically after this method returns4.4. Test Multiple Runs
Since DP adds random noise, run multiple times:
for i in {1..10}; do
echo "Run $i"
python test_star_pattern_dp.py
doneCollect results to understand noise distribution and variability.
5. Common Issues and Solutions
Issue 1: Results Too Noisy to Converge
Problem:
epsilon=0.1 # Very strong privacy
# Noise is so large that convergence criteria never metSolution:
# Either increase epsilon (weaker privacy)
epsilon=1.0
# Or use iteration-based stopping
def has_converged(self, result, last_result):
return self.num_iterations >= 10 # Don't rely on value changesIssue 2: Privacy Budget Concerns
Problem: Multiple iterations consume privacy budget additively.
Solution:
# Allocate budget per iteration
total_epsilon = 1.0
num_iterations = 5
epsilon_per_iteration = total_epsilon / num_iterations # 0.2
# Or stop early to preserve budget
def has_converged(self, result, last_result):
privacy_budget_used = self.num_iterations * self.epsilon
return privacy_budget_used >= 1.0 # Stop when budget exhaustedIssue 3: Incorrect Sensitivity
Problem:
sensitivity=1 # But data actually ranges 0-100
# Privacy guarantee is compromised!Solution:
# Calculate proper sensitivity
data_range = max_value - min_value # 100 - 0 = 100
num_nodes = len(data_splits) # 2
sensitivity = data_range / num_nodes # 100 / 2 = 50
StarModelTester(..., sensitivity=50)Issue 4: Non-Numeric Results
Problem:
def aggregation_method(self, analysis_results):
return {"mean": 4.5, "std": 1.2} # Dict not supported!Solution:
def aggregation_method(self, analysis_results):
return sum(analysis_results) / len(analysis_results) # Single number only6. Next Steps
6.1. Move to Production
After local testing succeeds:
- Replace
StarModelTesterwithStarLocalDPModel:pythonfrom flame.star import StarLocalDPModel StarLocalDPModel( analyzer=MyAnalyzer, aggregator=MyAggregator, data_type='fhir', # Real FHIR server query='Patient?_summary=count', simple_analysis=False, epsilon=1.0, sensitivity=1.0 ) - Configure real data sources (FHIR servers, S3 buckets)
- Ensure no logging of true results in production!