refactor(batch-classification,extract-table): simplify code, improve functionalities, introduce langsmith library (#442)

This commit is contained in:
Jason Liu
2024-02-18 11:50:09 -05:00
committed by GitHub
parent 66a8285421
commit 5709a4a524
5 changed files with 334 additions and 159 deletions
+31 -88
View File
@@ -1,4 +1,3 @@
import json
import instructor
import asyncio
@@ -6,96 +5,44 @@ from openai import AsyncOpenAI
from pydantic import BaseModel, Field, field_validator
from typing import List
from enum import Enum
import diskcache
import os
import inspect
import functools
client = instructor.patch(AsyncOpenAI(), mode=instructor.Mode.TOOLS)
client = AsyncOpenAI()
client = instructor.patch(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)
pwd = os.getcwd()
cache = diskcache.Cache(pwd)
def instructor_cache(func):
"""Cache a function that returns a Pydantic model"""
return_type = inspect.signature(func).return_annotation #
if not issubclass(return_type, BaseModel): #
raise ValueError("The return type must be a Pydantic model")
@functools.wraps(func)
async def wrapper(*args, **kwargs):
key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"
if (cached := cache.get(key)) is not None:
# Deserialize from JSON based on the return type
return return_type.model_validate_json(cached)
result = await func(*args, **kwargs)
# Call the function and cache its result
serialized_result = result.model_dump_json()
cache.set(key, serialized_result)
return result
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}"
if (cached := cache.get(key)) is not None:
return return_type.model_validate_json(cached)
result = func(*args, **kwargs)
serialized_result = result.model_dump_json()
cache.set(key, serialized_result)
return result
return wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
class QuestionType(Enum):
CONTENT_OWNERSHIP = "CONTENT_OWNERSHIP"
CONTACT = "CONTACT"
TIMELINE_QUERY = "TIMELINE_QUERY"
DOCUMENT_SEARCH = "DOCUMENT_SEARCH"
COMPARE_CONTRAST = "COMPARE_CONTRAST"
MEETING_TRANSCRIPTS = "MEETING_TRANSCRIPTS"
EMAIL = "EMAIL"
PHOTOS = "PHOTOS"
HOW_DOES_THIS_WORK = "HOW_DOES_THIS_WORK"
NEEDLE_IN_HAYSTACK = "NEEDLE_IN_HAYSTACK"
SUMMARY = "SUMMARY"
ALLOWED_TYPES = [t.value for t in QuestionType]
# You can add more instructions and examples in the description
# or you can put it in the prompt in `messages=[...]`
class QuestionClassification(BaseModel):
"""
Predict the type of question that is being asked.
Here are some tips on how to predict the question type:
CONTENT_OWNERSHIP: "Who owns the a certain piece of content?"
CONTACT: Searches for some contact information.
TIMELINE_QUERY: "When did something happen?
DOCUMENT_SEARCH: "Find me a document"
COMPARE_CONTRAST: "Compare and contrast two things"
MEETING_TRANSCRIPTS: "Find me a transcript of a meeting, or a soemthing said in a meeting"
EMAIL: "Find me an email, search for an email"
PHOTOS: "Find me a photo, search for a photo"
HOW_DOES_THIS_WORK: "How does this question /answer product work?"
NEEDLE_IN_HAYSTACK: "Find me something specific in a large amount of data"
SUMMARY: "Summarize a large amount of data"
"""
# If you want only one classification, just change it to
# `classification: QuestionType` rather than `classifications: List[QuestionType]``
chain_of_thought: str = Field(
..., description="The chain of thought that led to the classification"
)
classification: List[QuestionType] = Field(
description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {ALLOWED_TYPES}, should be used",
description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {[t.value for t in QuestionType]}, should be used",
)
@field_validator("classification", mode="before")
@@ -106,52 +53,48 @@ class QuestionClassification(BaseModel):
return v
@instructor_cache
async def classify_question(user_question: str) -> QuestionClassification:
return await client.chat.completions.create(
model="gpt-4",
response_model=QuestionClassification,
max_retries=2,
messages=[
{
"role": "user",
"content": f"Classify the following question: {user_question}",
},
],
)
# Modify the classify function
async def classify(data: str) -> QuestionClassification:
async with sem: # some simple rate limiting
return data, await classify_question(data)
return data, await client.chat.completions.create(
model="gpt-4",
response_model=QuestionClassification,
max_retries=2,
messages=[
{
"role": "user",
"content": f"Classify the following question: {data}",
},
],
)
async def main(
questions: List[str], *, path_to_jsonl: str = None
) -> List[QuestionClassification]:
async def main(questions: List[str]):
tasks = [classify(question) for question in questions]
resps = []
for task in asyncio.as_completed(tasks):
question, label = await task
resp = {
"question": question,
"classification": [c.value for c in label.classification],
"chain_of_thought": label.chain_of_thought,
}
print(resp)
if path_to_jsonl:
with open(path_to_jsonl, "a") as f:
json_dump = json.dumps(resp)
f.write(json_dump + "\n")
resps.append(resp)
return resps
if __name__ == "__main__":
import asyncio
path = "./data.jsonl"
questions = [
"What was that ai app that i saw on the news the other day?",
"What was that ai app that i saw on the news the other day?",
"What was that ai app that i saw on the news the other day?",
"Can you find the trainline booking email?",
"What was the book I saw on amazon yesturday?",
"Can you speak german?",
"Do you have access to the meeting transcripts?",
"what are the recent sites I visited?",
"what did I do on Monday?",
"Tell me about todays meeting and how it relates to the email on Monday",
]
asyncio.run(main(questions, path_to_jsonl=path))
asyncio.run(main(questions))
+7 -21
View File
@@ -7,53 +7,43 @@ from pydantic import BaseModel, Field, field_validator
from typing import List
from enum import Enum
client = instructor.patch(AsyncOpenAI(), mode=instructor.Mode.TOOLS)
client = AsyncOpenAI()
client = instructor.patch(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)
class QuestionType(Enum):
CONTENT_OWNERSHIP = "CONTENT_OWNERSHIP"
CONTACT = "CONTACT"
TIMELINE_QUERY = "TIMELINE_QUERY"
DOCUMENT_SEARCH = "DOCUMENT_SEARCH"
COMPARE_CONTRAST = "COMPARE_CONTRAST"
MEETING_TRANSCRIPTS = "MEETING_TRANSCRIPTS"
EMAIL = "EMAIL"
PHOTOS = "PHOTOS"
HOW_DOES_THIS_WORK = "HOW_DOES_THIS_WORK"
NEEDLE_IN_HAYSTACK = "NEEDLE_IN_HAYSTACK"
SUMMARY = "SUMMARY"
ALLOWED_TYPES = [t.value for t in QuestionType]
# You can add more instructions and examples in the description
# or you can put it in the prompt in `messages=[...]`
class QuestionClassification(BaseModel):
"""
Predict the type of question that is being asked.
Here are some tips on how to predict the question type:
CONTENT_OWNERSHIP: "Who owns the a certain piece of content?"
CONTACT: Searches for some contact information.
TIMELINE_QUERY: "When did something happen?
DOCUMENT_SEARCH: "Find me a document"
COMPARE_CONTRAST: "Compare and contrast two things"
MEETING_TRANSCRIPTS: "Find me a transcript of a meeting, or a soemthing said in a meeting"
EMAIL: "Find me an email, search for an email"
PHOTOS: "Find me a photo, search for a photo"
HOW_DOES_THIS_WORK: "How does this question /answer product work?"
NEEDLE_IN_HAYSTACK: "Find me something specific in a large amount of data"
SUMMARY: "Summarize a large amount of data"
"""
# If you want only one classification, just change it to
# `classification: QuestionType` rather than `classifications: List[QuestionType]``
chain_of_thought: str = Field(
..., description="The chain of thought that led to the classification"
)
classification: List[QuestionType] = Field(
description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {ALLOWED_TYPES}, should be used",
description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {[t.value for t in QuestionType]}, should be used",
)
@field_validator("classification", mode="before")
@@ -64,7 +54,6 @@ class QuestionClassification(BaseModel):
return v
# Modify the classify function
async def classify(data: str) -> QuestionClassification:
async with sem: # some simple rate limiting
return data, await client.chat.completions.create(
@@ -100,9 +89,6 @@ async def main(
if __name__ == "__main__":
import asyncio
path = "./data.jsonl"
# Obviously we might want to big query or
# load this from a file or something???
questions = [
"What was that ai app that i saw on the news the other day?",
"Can you find the trainline booking email?",
@@ -114,4 +100,4 @@ if __name__ == "__main__":
"Tell me about todays meeting and how it relates to the email on Monday",
]
asyncio.run(main(questions, path_to_jsonl=path))
asyncio.run(main(questions))
@@ -0,0 +1,104 @@
import instructor
import asyncio
from langsmith import traceable
from langsmith.wrappers import wrap_openai
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, field_validator
from typing import List
from enum import Enum
client = wrap_openai(AsyncOpenAI())
client = instructor.patch(client, mode=instructor.Mode.TOOLS)
sem = asyncio.Semaphore(5)
class QuestionType(Enum):
CONTACT = "CONTACT"
TIMELINE_QUERY = "TIMELINE_QUERY"
DOCUMENT_SEARCH = "DOCUMENT_SEARCH"
COMPARE_CONTRAST = "COMPARE_CONTRAST"
EMAIL = "EMAIL"
PHOTOS = "PHOTOS"
SUMMARY = "SUMMARY"
# You can add more instructions and examples in the description
# or you can put it in the prompt in `messages=[...]`
class QuestionClassification(BaseModel):
"""
Predict the type of question that is being asked.
Here are some tips on how to predict the question type:
CONTACT: Searches for some contact information.
TIMELINE_QUERY: "When did something happen?
DOCUMENT_SEARCH: "Find me a document"
COMPARE_CONTRAST: "Compare and contrast two things"
EMAIL: "Find me an email, search for an email"
PHOTOS: "Find me a photo, search for a photo"
SUMMARY: "Summarize a large amount of data"
"""
# If you want only one classification, just change it to
# `classification: QuestionType` rather than `classifications: List[QuestionType]``
chain_of_thought: str = Field(
..., description="The chain of thought that led to the classification"
)
classification: List[QuestionType] = Field(
description=f"An accuracy and correct prediction predicted class of question. Only allowed types: {[t.value for t in QuestionType]}, should be used",
)
@field_validator("classification", mode="before")
def validate_classification(cls, v):
# sometimes the API returns a single value, just make sure it's a list
if not isinstance(v, list):
v = [v]
return v
# Modify the classify function
@traceable(name="classify-question")
async def classify(data: str) -> QuestionClassification:
async with sem: # some simple rate limiting
return data, await client.chat.completions.create(
model="gpt-4",
response_model=QuestionClassification,
max_retries=2,
messages=[
{
"role": "user",
"content": f"Classify the following question: {data}",
},
],
)
async def main(questions: List[str]):
tasks = [classify(question) for question in questions]
resps = []
for task in asyncio.as_completed(tasks):
question, label = await task
resp = {
"question": question,
"classification": [c.value for c in label.classification],
"chain_of_thought": label.chain_of_thought,
}
resps.append(resp)
return resps
if __name__ == "__main__":
import asyncio
questions = [
"What was that ai app that i saw on the news the other day?",
"Can you find the trainline booking email?",
"What was the book I saw on amazon yesturday?",
"Can you speak german?",
"Do you have access to the meeting transcripts?",
"what are the recent sites I visited?",
"what did I do on Monday?",
"Tell me about todays meeting and how it relates to the email on Monday",
]
asyncio.run(main(questions))
+68 -50
View File
@@ -1,6 +1,6 @@
from openai import OpenAI
from io import StringIO
from typing import Annotated, Any, Iterable
from typing import Annotated, Any, List
from pydantic import (
BaseModel,
BeforeValidator,
@@ -8,11 +8,12 @@ from pydantic import (
InstanceOf,
WithJsonSchema,
)
import pandas as pd
import instructor
import pandas as pd
client = instructor.patch(OpenAI(), mode=instructor.function_calls.Mode.MD_JSON)
client = OpenAI()
client = instructor.patch(client, mode=instructor.function_calls.Mode.MD_JSON)
def md_to_df(data: Any) -> Any:
@@ -51,53 +52,70 @@ class Table(BaseModel):
dataframe: MarkdownDataFrame
tables = client.chat.completions.create(
model="gpt-4-vision-preview",
max_tokens=1000,
response_model=Iterable[Table],
messages=[
{
"role": "user",
"content": [
class MultipleTables(BaseModel):
tables: List[Table]
example = MultipleTables(
tables=[
Table(
caption="This is a caption",
dataframe=pd.DataFrame(
{
"type": "text",
"text": "Describe this data accurately as a table in markdown format.",
},
{
"type": "image_url",
"image_url": {
# "url": "https://a.storyblok.com/f/47007/2400x1260/f816b031cb/uk-ireland-in-three-charts_chart_a.png/m/2880x0",
# "url": "https://a.storyblok.com/f/47007/2400x2000/bf383abc3c/231031_uk-ireland-in-three-charts_table_v01_b.png/m/2880x0",
# "url": "https://a.storyblok.com/f/47007/4800x2766/1688e25601/230629_attoptinratesmidyear_blog_chart02_v01.png/m/2880x0"
"url": "https://a.storyblok.com/f/47007/2400x1260/934d294894/uk-ireland-in-three-charts_chart_b.png/m/2880x0"
},
},
{
"type": "text",
"text": """
First take a moment to reason about the best set of headers for the tables.
Write a good h1 for the image above. Then follow up with a short description of the what the data is about.
Then for each table you identified, write a h2 tag that is a descriptive title of the table.
Then follow up with a short description of the what the data is about.
Lastly, produce the markdown table for each table you identified.
""",
},
],
}
],
"Chart A": [10, 40],
"Chart B": [20, 50],
"Chart C": [30, 60],
}
),
)
]
)
for table in tables:
print(table.caption)
print(table.dataframe)
print()
"""
D1 App Retention Rates July 2023 (Ireland & U.K.)
Ireland UK
Category
Education 14% 12%
Entertainment 13% 11%
Games 26% 25%
Social 27% 18%
Utilities 11% 9%
"""
def extract(url: str) -> MultipleTables:
tables = client.chat.completions.create(
model="gpt-4-vision-preview",
max_tokens=4000,
response_model=MultipleTables,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Describe this data accurately as a table in markdown format. {example.model_dump_json(indent=2)}",
},
{
"type": "image_url",
"image_url": {"url": url},
},
{
"type": "text",
"text": """
First take a moment to reason about the best set of headers for the tables.
Write a good h1 for the image above. Then follow up with a short description of the what the data is about.
Then for each table you identified, write a h2 tag that is a descriptive title of the table.
Then follow up with a short description of the what the data is about.
Lastly, produce the markdown table for each table you identified.
Make sure to escape the markdown table properly, and make sure to include the caption and the dataframe.
including escaping all the newlines and quotes. Only return a markdown table in dataframe, nothing else.
""",
},
],
}
],
)
return tables.model_dump()
urls = [
"https://a.storyblok.com/f/47007/2400x1260/f816b031cb/uk-ireland-in-three-charts_chart_a.png/m/2880x0",
"https://a.storyblok.com/f/47007/2400x2000/bf383abc3c/231031_uk-ireland-in-three-charts_table_v01_b.png/m/2880x0",
]
for url in urls:
tables = extract(url)
print(tables)
@@ -0,0 +1,124 @@
from openai import OpenAI
from io import StringIO
from typing import Annotated, Any, List
from pydantic import (
BaseModel,
BeforeValidator,
PlainSerializer,
InstanceOf,
WithJsonSchema,
)
import instructor
import pandas as pd
from langsmith.wrappers import wrap_openai
from langsmith import traceable
client = wrap_openai(OpenAI())
client = instructor.patch(client, mode=instructor.function_calls.Mode.MD_JSON)
def md_to_df(data: Any) -> Any:
if isinstance(data, str):
return (
pd.read_csv(
StringIO(data), # Get rid of whitespaces
sep="|",
index_col=1,
)
.dropna(axis=1, how="all")
.iloc[1:]
.map(lambda x: x.strip())
)
return data
MarkdownDataFrame = Annotated[
InstanceOf[pd.DataFrame],
BeforeValidator(md_to_df),
PlainSerializer(lambda x: x.to_markdown()),
WithJsonSchema(
{
"type": "string",
"description": """
The markdown representation of the table,
each one should be tidy, do not try to join tables
that should be seperate""",
}
),
]
class Table(BaseModel):
caption: str
dataframe: MarkdownDataFrame
class MultipleTables(BaseModel):
tables: List[Table]
example = MultipleTables(
tables=[
Table(
caption="This is a caption",
dataframe=pd.DataFrame(
{
"Chart A": [10, 40],
"Chart B": [20, 50],
"Chart C": [30, 60],
}
),
)
]
)
@traceable(name="extract-table")
def extract(url: str) -> MultipleTables:
tables = client.chat.completions.create(
model="gpt-4-vision-preview",
max_tokens=4000,
response_model=MultipleTables,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": f"Describe this data accurately as a table in markdown format. {example.model_dump_json(indent=2)}",
},
{
"type": "image_url",
"image_url": {"url": url},
},
{
"type": "text",
"text": """
First take a moment to reason about the best set of headers for the tables.
Write a good h1 for the image above. Then follow up with a short description of the what the data is about.
Then for each table you identified, write a h2 tag that is a descriptive title of the table.
Then follow up with a short description of the what the data is about.
Lastly, produce the markdown table for each table you identified.
Make sure to escape the markdown table properly, and make sure to include the caption and the dataframe.
including escaping all the newlines and quotes. Only return a markdown table in dataframe, nothing else.
""",
},
],
}
],
)
return tables.model_dump()
urls = [
"https://a.storyblok.com/f/47007/2400x1260/f816b031cb/uk-ireland-in-three-charts_chart_a.png/m/2880x0",
"https://a.storyblok.com/f/47007/2400x2000/bf383abc3c/231031_uk-ireland-in-three-charts_table_v01_b.png/m/2880x0",
]
for url in urls:
tables = extract(url)
print(tables)