From fff1b67e5d713d6d896f6d359674aa40549e1738 Mon Sep 17 00:00:00 2001 From: Jason Date: Mon, 11 Sep 2023 20:37:08 -0400 Subject: [PATCH] clean up eval.py --- examples/evals/eval.py | 158 +++++++++++++++++++++++++------- examples/evals/summary_stats.py | 132 -------------------------- 2 files changed, 123 insertions(+), 167 deletions(-) delete mode 100644 examples/evals/summary_stats.py diff --git a/examples/evals/eval.py b/examples/evals/eval.py index b00be32..3b8de28 100644 --- a/examples/evals/eval.py +++ b/examples/evals/eval.py @@ -1,11 +1,10 @@ -from collections import defaultdict -import json -import logging +from collections import Counter, defaultdict from enum import Enum -from pprint import pprint - +from typing import Any, Dict, Union, List +import numpy as np +import json from pydantic import ValidationError -from summary_stats import StreamingAccumulatorManager +from pprint import pprint import models as m @@ -15,46 +14,135 @@ class Status(Enum): VALIDATION_ERROR = "_validation_error_" -def process_line(eval_manager, line, index): - try: - obj = json.loads(line) - eval_manager.accumulator[Status.IS_JSON.value].update(index, True) +class StreamingAccumulatorManager: + def __init__(self): + self.accumulator = defaultdict(StreamingAccumulator) + def validate_string(self, json_string: str, index: int) -> None: try: - obj = m.MultiSearch.model_validate(obj) - eval_manager.update(index, obj.model_dump()) - eval_manager.accumulator[Status.IS_VALID.value].update(index, True) + obj = json.loads(json_string) + self.accumulator[Status.IS_JSON.value].update(index, True) + try: + # Replace this line with your validation logic + obj = m.MultiSearch.model_validate(obj) + self.update(index, obj) + self.accumulator[Status.IS_VALID.value].update(index, True) + except ValidationError as e: + self.accumulator[Status.IS_VALID.value].update(index, False) + self.process_validation_error(e, index) + except json.JSONDecodeError: + self.accumulator[Status.IS_JSON.value].update(index, False) - except ValidationError as e: - eval_manager.accumulator[Status.IS_VALID.value].update(index, False) - process_validation_error(eval_manager, e, index) - - except json.JSONDecodeError: - eval_manager.accumulator[Status.IS_JSON.value].update(index, False) - - -def process_validation_error(eval_manager, error, index): - for err in error.errors(): - path = ( - "$." - + ".".join( - [str(x) if not isinstance(x, int) else "[*]" for x in err["loc"]] + def process_validation_error(self, error, index): + for err in error.errors(): + path = ( + "$." + + ".".join( + [str(x) if not isinstance(x, int) else "[*]" for x in err["loc"]] + ) + + "." + + err["type"] ) - + "." - + err["type"] - ) - eval_manager.accumulator[Status.VALIDATION_ERROR.value].update(index, path) + self.accumulator[Status.VALIDATION_ERROR.value].update(index, path) + + def update(self, index, data: Any, path: str = "$") -> None: + if isinstance(data, dict): + for key, value in data.items(): + new_path = f"{path}.{key}" + self.update(index, value, new_path) + elif isinstance(data, list): + new_path = f"{path}[*]" + for value in data: + self.update(index, value, new_path) + length_path = f"{path}.length" + self.accumulator[length_path].update(index, len(data)) + elif isinstance(data, Enum): + enum_path = f"{path}.enum" + self.accumulator[enum_path].update(index, data.value) + elif path != "$": + pass + else: + self.accumulator[path].update(index, data) + + def summarize(self) -> Dict[str, Dict]: + return {k: v.summarize(key_name=k) for k, v in self.accumulator.items()} + + +class StreamingAccumulator: + def __init__(self): + self.counter = Counter() + self.min = float("inf") + self.max = float("-inf") + self.sum = 0 + self.squared_sum = 0 + self.unique_values = set() + self.missing_values = 0 + self.str_min_length = float("inf") + self.str_max_length = float("-inf") + self.str_sum_length = 0 + self.str_squared_sum_length = 0 + self.value = [] + self.str_length = [] + self.reverse_lookup = defaultdict(list) + + def update(self, index: Any, value: Any) -> None: + if isinstance(value, (int, str, bool)): + self.counter[value] += 1 + self.unique_values.add(value) + self.value.append(value) + self.reverse_lookup[value].append(index) + if value is None or value == "": + self.missing_values += 1 + return + if isinstance(value, (int, float)): + self.min = min(self.min, value) + self.max = max(self.max, value) + self.sum += value + self.squared_sum += value**2 + if isinstance(value, str): + str_len = len(value) + self.str_length.append(str_len) + self.str_min_length = min(self.str_min_length, str_len) + self.str_max_length = max(self.str_max_length, str_len) + self.str_sum_length += str_len + self.str_squared_sum_length += str_len**2 + + def summarize(self, key_name=None) -> Dict[str, Union[int, float, dict]]: + if key_name is None: + key_name = "" + n = sum(self.counter.values()) + summaries = {} + summaries["counter"] = self.counter + summaries["unique_count"] = len(self.unique_values) + summaries["missing_values"] = self.missing_values + summaries["_reverse_lookup"] = dict(self.reverse_lookup) + if n > 0: + if all(isinstance(value, (bool)) for value in self.unique_values): + summaries["mean"] = self.sum / n + return summaries + if all(isinstance(value, (int, float)) for value in self.unique_values): + summaries["min"] = self.min + summaries["max"] = self.max + summaries["mean"] = self.sum / n + summaries["std"] = np.sqrt(self.squared_sum / n - (self.sum / n) ** 2) + return summaries + if all(isinstance(value, str) for value in self.unique_values): + summaries["str_min_length"] = self.str_min_length + summaries["str_max_length"] = self.str_max_length + summaries["str_mean_length"] = self.str_sum_length / n + summaries["str_std_length"] = np.sqrt( + self.str_squared_sum_length / n - (self.str_sum_length / n) ** 2 + ) + return summaries + return summaries if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - eval_manager = StreamingAccumulatorManager() with open("test.jsonl") as f: lines = f.readlines() - for ii, line in enumerate(lines): - process_line(eval_manager, line, ii) + eval_manager.validate_string(line, ii) pprint(eval_manager.summarize()) diff --git a/examples/evals/summary_stats.py b/examples/evals/summary_stats.py deleted file mode 100644 index 34c8da6..0000000 --- a/examples/evals/summary_stats.py +++ /dev/null @@ -1,132 +0,0 @@ -# Modified StreamingAccumulator class with self.value and self.str_length as lists - -from collections import Counter, defaultdict -from enum import Enum -from typing import Any, Dict, Union, List -import numpy as np -from pydantic import BaseModel, Field - - -class StreamingAccumulator: - counter: Counter = Field(default_factory=Counter) - min: float = float("inf") - max: float = float("-inf") - sum: float = 0 - squared_sum: float = 0 - unique_values: set = Field(default_factory=set) - missing_values: int = 0 - str_min_length: float = float("inf") - str_max_length: float = float("-inf") - str_sum_length: float = 0 - str_squared_sum_length: float = 0 - value: List[Any] = Field(default_factory=list) # Added back as a list - str_length: List[int] = Field(default_factory=list) # Added back as a list - reverse_lookup: defaultdict = defaultdict(list) - - def __init__(self): - self.counter = Counter() - self.min = float("inf") - self.max = float("-inf") - self.sum = 0 - self.squared_sum = 0 - self.unique_values = set() - self.missing_values = 0 - self.str_min_length = float("inf") - self.str_max_length = float("-inf") - self.str_sum_length = 0 - self.str_squared_sum_length = 0 - self.value = [] - self.str_length = [] - self.reverse_lookup = defaultdict(list) - - def update(self, index: Any, value: Any) -> None: - """Update statistics with a new value.""" - - if isinstance(value, (int, str, bool)): - self.counter[value] += 1 - self.unique_values.add(value) - self.value.append(value) - self.reverse_lookup[value].append(index) - - if value is None or value == "": - self.missing_values += 1 - return - - if isinstance(value, (int, float)): - self.min = min(self.min, value) - self.max = max(self.max, value) - self.sum += value - self.squared_sum += value**2 - - if isinstance(value, str): - str_len = len(value) - self.str_length.append(str_len) # Append the string length to the list - self.str_min_length = min(self.str_min_length, str_len) - self.str_max_length = max(self.str_max_length, str_len) - self.str_sum_length += str_len - self.str_squared_sum_length += str_len**2 - - def summarize(self, key_name=None) -> Dict[str, Union[int, float, dict]]: - if key_name is None: - key_name = "" - - n = sum(self.counter.values()) - summaries = {} - summaries["counter"] = self.counter - summaries["unique_count"] = len(self.unique_values) - summaries["missing_values"] = self.missing_values - summaries["_reverse_lookup"] = dict(self.reverse_lookup) - - if n > 0: - if all(isinstance(value, (bool)) for value in self.unique_values): - summaries["mean"] = self.sum / n - return summaries - - if all(isinstance(value, (int, float)) for value in self.unique_values): - summaries["min"] = self.min - summaries["max"] = self.max - summaries["mean"] = self.sum / n - summaries["std"] = np.sqrt(self.squared_sum / n - (self.sum / n) ** 2) - return summaries - - if all( - isinstance(value, str) for value in self.unique_values - ) and not key_name.startswith("_"): - summaries["str_min_length"] = self.str_min_length - summaries["str_max_length"] = self.str_max_length - summaries["str_mean_length"] = self.str_sum_length / n - summaries["str_std_length"] = np.sqrt( - self.str_squared_sum_length / n - (self.str_sum_length / n) ** 2 - ) - return summaries - - return summaries - - -class StreamingAccumulatorManager: - def __init__(self): - self.accumulator = defaultdict(StreamingAccumulator) - - def update(self, index, data: Any, path: str = "$") -> None: - """Accumulate values from a nested object.""" - if isinstance(data, dict): - for key, value in data.items(): - new_path = f"{path}.{key}" - self.update(index, value, new_path) - elif isinstance(data, list): - new_path = f"{path}[*]" - for value in data: - self.update(index, value, new_path) - length_path = f"{path}.length" - self.accumulator[length_path].update(index, len(data)) - elif isinstance(data, Enum): - enum_path = f"{path}.enum" - self.accumulator[enum_path].update(index, data.value) - elif path != "$": - pass - else: - self.accumulator[path].update(index, data) - - def summarize(self) -> Dict[str, Dict]: - """Generate summary statistics for all paths.""" - return {k: v.summarize(key_name=k) for k, v in self.accumulator.items()}