Step-by-Step Guide for a minimal Text scoring example using FLAME
This script was developed to serve as a minimal example to parse text files (here: doctor's letters) and return arbitrary text statistics. This script reads text data from S3, computes various readability metrics on each text, aggregates these metrics across nodes, and outputs the average readability scores.
Step 1: Imports and Setup
At the top of test_gemtex.py, import necessary modules
from flame.star import StarModel, StarAnalyzer, StarAggregator
from readability import ReadabilityStarModel,StarAnalyzer,StarAggregator: FLAME SDK components.Readability: Library for computing readability scores. This library is only part of the master image for GemTeX not a default for other master images.
Step 2: Implement the Analyzer
The MyAnalyzer class inherits from StarAnalyzer and has to overwrite the abstractmethod analysis_method:
class MyAnalyzer(StarAnalyzer):
def __init__(self, flame):
super().__init__(flame)analysis_method(self, data, aggregator_results):Receives
data: a list where each element is a dictionary mapping S3 object keys to raw bytes. depending on the project your data will be given to this method.Decodes each bytes value to UTF-8 text.
Creates a
Readabilityobject to compute seven metrics:- Flesch–Kincaid grade level
- Flesch reading ease
- Gunning fog index
- Coleman–Liau index
- Dale–Chall score
- Automated Readability Index (ARI)
- Linsear Write formula
Collects these metrics for each text into
scores.Calculates
avg_scores_node: the average of each metric across all texts on that node.Returns a list of 7 average scores.
def analysis_method(self, data, aggregator_results):
scores = []
for text in data[0].values():
text = text.decode("utf-8")
r = Readability(text)
a1 = r.flesch_kincaid().score
a2 = r.flesch().score
a3 = r.gunning_fog().score
a4 = r.coleman_liau().score
a5 = r.dale_chall().score
a6 = r.ari().score
a7 = r.linsear_write().score
scores.append([a1, a2, a3, a4, a5, a6, a7])
avg_scores_node = [sum(group) / len(group) for group in zip(*scores)]
return avg_scores_nodeStep 3: Implement the Aggregator
The MyAggregator class inherits from StarAggregator and has to overwrite the abstractmethod aggregation_method:
class MyAggregator(StarAggregator):
def __init__(self, flame):
super().__init__(flame)aggregation_method(self, analysis_results):- Receives
analysis_results: a list of lists, where each inner list is the output of one node'sanalysis_method. - Computes
avg_scores_global: the average of each metric across all nodes. - Returns a list of 7 globally averaged scores.
- Receives
has_converged(self, result, last_result):- Always returns
True, so the model runs only one iteration.
- Always returns
def aggregation_method(self, analysis_results):
avg_scores_global = [sum(group) / len(group) for group in zip(*analysis_results)]
return avg_scores_global
def has_converged(self, result, last_result):
return TrueStep 4: Configure and Run StarModel
The main function sets up and runs the model:
def main():
StarModel(
analyzer=MyAnalyzer,
aggregator=MyAggregator,
data_type='s3',
simple_analysis=True,
output_type='str',
analyzer_kwargs=None,
aggregator_kwargs=None
)
if __name__ == "__main__":
main()data_type:'s3'tells FLAME to fetch data from S3.simple_analysis:Truefor a single iteration.output_type:'str'specifies the format of the returned result.
The script will:
- Fetch all text files from configured S3 sources.
- Compute readability metrics on each node.
- Aggregate results and print a global list of seven average scores.
Together the files look like this:
from flame.star import StarModel, StarAnalyzer, StarAggregator
from readability import Readability
class MyAnalyzer(StarAnalyzer):
def __init__(self, flame):
"""
Initializes the custom Analyzer node.
:param flame: Instance of FlameCoreSDK to interact with the FLAME components.
"""
super().__init__(flame) # Connects this analyzer to the FLAME components
def analysis_method(self, data, aggregator_results):
"""
Performs analysis on the retrieved data from data sources.
:param data: A list of dictionaries containing the data from each data source.
- Each dictionary corresponds to a data source.
- Keys are the queries executed, and values are the results (dict for FHIR, str for S3).
:param aggregator_results: Results from the aggregator in previous iterations.
- None in the first iteration.
- Contains the result from the aggregator's aggregation_method in subsequent iterations.
:return: Any result of your analysis on one node (ex. patient count).
"""
scores = []
for text in data[0].values():
text = text.decode("utf-8")
r = Readability(text)
a1 = r.flesch_kincaid().score
a2 = r.flesch().score
a3 = r.gunning_fog().score
a4 = r.coleman_liau().score
a5 = r.dale_chall().score
a6 = r.ari().score
a7 = r.linsear_write().score
scores.append([a1, a2, a3, a4, a5, a6, a7])
avg_scores_node = [sum(group) / len(group) for group in zip(*scores)]
return avg_scores_node
class MyAggregator(StarAggregator):
def __init__(self, flame):
"""
Initializes the custom Aggregator node.
:param flame: Instance of FlameCoreSDK to interact with the FLAME components.
"""
super().__init__(flame) # Connects this aggregator to the FLAME components
def aggregation_method(self, analysis_results):
"""
Aggregates the results received from all analyzer nodes.
:param analysis_results: A list of analysis results from each analyzer node.
:return: The aggregated result (e.g., total patient count across all analyzers).
"""
# averaging individual scores across all nodes
avg_scores_global = [sum(group) / len(group) for group in zip(*analysis_results)]
return avg_scores_global
def has_converged(self, result, last_result):
"""
Determines if the aggregation process has converged.
:param result: The current aggregated result.
:param last_result: The aggregated result from the previous iteration.
:return: True if the aggregation has converged; False to continue iterations.
"""
return True # Return True to indicate convergence in this simple analysis
def main():
"""
Sets up and initiates the distributed analysis using the FLAME components.
- Defines the custom analyzer and aggregator classes.
- Specifies the type of data and queries to execute.
- Configures analysis parameters like iteration behavior and output format.
"""
StarModel(
analyzer=MyAnalyzer, # Custom analyzer class (must inherit from StarAnalyzer)
aggregator=MyAggregator, # Custom aggregator class (must inherit from StarAggregator)
data_type='s3', # Type of data source ('fhir' or 's3')
# query='Patient?_summary=count', # Query or list of queries to retrieve data
simple_analysis=True, # True for single-iteration; False for multi-iterative analysis
output_type='str', # Output format for the final result ('str', 'bytes', or 'pickle')
analyzer_kwargs=None, # Additional keyword arguments for the custom analyzer constructor (i.e. MyAnalyzer)
aggregator_kwargs=None # Additional keyword arguments for the custom aggregator constructor (i.e. MyAggregator)
)
if __name__ == "__main__":
main()