Analysis Coding
Info
This section is currently under development. The example provided is simple and intended to test our infrastructure.
This example can be used as entrypoint.py, which is referenced in this documentation.
Example Analysis using StarModel: Counting Patients Using a FHIR Query
This analysis example demonstrates how to count the total number of patients across multiple nodes with FHIR data, with the results being summed up for aggregation.
python
from flame.star import StarModel, StarAnalyzer, StarAggregator
class MyAnalyzer(StarAnalyzer):
def __init__(self, flame):
"""
Initializes the custom Analyzer node.
:param flame: Instance of FlameCoreSDK to interact with the FLAME components.
"""
super().__init__(flame) # Connects this analyzer to the FLAME components
def analysis_method(self, data, aggregator_results):
"""
Performs analysis on the retrieved data from data sources.
:param data: A list of dictionaries containing the data from each data source.
- Each dictionary corresponds to a data source.
- Keys are the queries executed, and values are the results (dict for FHIR, str for S3).
:param aggregator_results: Results from the aggregator in previous iterations.
- None in the first iteration.
- Contains the result from the aggregator's aggregation_method in subsequent iterations.
:return: Any result of your analysis on one node (ex. patient count).
"""
# TODO: Implement your analysis method
# in this example we retrieving first fhir dataset, extract patient counts,
# take total number of patients
patient_count = float(data[0]['Patient?_summary=count']['total'])
return patient_count
class MyAggregator(StarAggregator):
def __init__(self, flame):
"""
Initializes the custom Aggregator node.
:param flame: Instance of FlameCoreSDK to interact with the FLAME components.
"""
super().__init__(flame) # Connects this aggregator to the FLAME components
def aggregation_method(self, analysis_results):
"""
Aggregates the results received from all analyzer nodes.
:param analysis_results: A list of analysis results from each analyzer node.
:return: The aggregated result (e.g., total patient count across all analyzers).
"""
# TODO: Implement your aggregation method
# in this example we retrieving sum up total patient counts across all nodes
total_patient_count = sum(analysis_results)
return total_patient_count
def has_converged(self, result, last_result, num_iterations):
"""
Determines if the aggregation process has converged.
:param result: The current aggregated result.
:param last_result: The aggregated result from the previous iteration.
:param num_iterations: The number of iterations completed so far.
:return: True if the aggregation has converged; False to continue iterations.
"""
# TODO (optional): if the parameter 'simple_analysis' in 'StarModel' is set to False,
# this function defines the exit criteria in a multi-iterative analysis (otherwise ignored)
return True # Return True to indicate convergence in this simple analysis
def main():
"""
Sets up and initiates the distributed analysis using the FLAME components.
- Defines the custom analyzer and aggregator classes.
- Specifies the type of data and queries to execute.
- Configures analysis parameters like iteration behavior and output format.
"""
StarModel(
analyzer=MyAnalyzer, # Custom analyzer class (must inherit from StarAnalyzer)
aggregator=MyAggregator, # Custom aggregator class (must inherit from StarAggregator)
data_type='fhir', # Type of data source ('fhir' or 's3')
query='Patient?_summary=count', # Query or list of queries to retrieve data
simple_analysis=True, # True for single-iteration; False for multi-iterative analysis
output_type='str', # Output format for the final result ('str', 'bytes', or 'pickle')
analyzer_kwargs=None, # Additional keyword arguments for the custom analyzer constructor (i.e. MyAnalyzer)
aggregator_kwargs=None # Additional keyword arguments for the custom aggregator constructor (i.e. MyAggregator)
)
if __name__ == "__main__":
main()Explanation
MyAnalyzer: Custom class created by the user for analysis (has to inherit fromStarAnalyzerand has to implementanalysis_method()).analysis_method(): Custom function processing/analyzing the nodes' data according to the user's specifications. [In Example: Returns the patient counts.]- Input-Parameters given by
StarModel:data: Contains input data either in s3 or fhir format (depending on datastore anddata_typespecification in theStarModelinstantiation). It is a list of python dictionaries, with each dictionary corresponding to one datasource within the node's datastore (often only one). Each dictionary utilizes the specified query or queries specified in theStarModelinstantiation (query). For s3 data, those queries equate to the dataset filenames, for fhir they equate to the fhir-queries. Ifquery=Noneis specified, for s3, all available datasets will be returned using their filenames as keys, while for fhir nothing will be returned (i.e. fhir datasets require query input to return anything).
python[In Example (for a single datasource, and a single query):data = [{<query_1_1>: <dataset_1_1>, ..., <query_n_1>: <dataset_n_1>}, ..., {<query_1_n>: <dataset_1_n>, ..., <query_n_n>: <dataset_n_n>}]python]data = [{'Patient?_summary=count': 10}]aggregator_results: Contains the output of the previous iteration'saggregation_method()(only used in multi-iterative analyzes). Can/should be used to compare results or calculate deltas from previous iterations. [In Example:aggregator_results=None]
- Input-Parameters given by
MyAggregator: Custom class created by the user for aggregation (has to inherit fromStarAggregatorand has to implementaggregation_method()andhas_converged()).aggregation_method(): Combines results submitted by the nodes. [In Example: Sums the nodes' respective patient counts.]- Input-Parameters given by
StarModel:analysis_results: Contains results of allanalysis_method()executions by the analyzer nodes. It is set as a simple list of those results, i.e. it retains no information which node sent which result. [In Example: Simple list of node patient counts.]
- Input-Parameters given by
has_converged(): Method returning a boolean value, specifiable by the user. If this returnsTrue, a multi-iterative analysis would submit its final results to the Hub, and terminate its and all analysis node's executions, else it would return the aggregated results back to the analyzer nodes for the next iteration. This method will only be executed ifStarModelwas initialized withsimple_analysis=False, and then starting from the second iteration. [In Example: Is set to True, but also ignored sincesimple_analysis=Truein theStarModelinstantiation inmain(), i.e. implying a single-iteration analysis.]- Input-Parameters given by
StarModel:result: Output of the current iteration'saggregation_method().last_result: Output of the previous iteration'saggregation_method().num_iterations: Number of iterations executed. This number is incremented after executing thehas_converged()-check, i.e. equates to 1 in the second iteration of the analysis.
- Input-Parameters given by
main()-function: Instantiates theStarModelclass automatically executing the analysis on the node (either as an aggregator or analyzer node).
This script serves as a basic "Hello World" example for performing federated analysis using FHIR data.