Implement streaming entities via MultiTask (#64)

* add streaming tasks

* push docs
This commit is contained in:
Jason Liu
2023-07-19 01:21:24 +08:00
committed by GitHub
parent b5959bdbc9
commit 58bc3bae0f
5 changed files with 220 additions and 17 deletions
+3
View File
@@ -0,0 +1,3 @@
# API: MultiTask
::: openai_function_call.dsl.multitask
+103 -12
View File
@@ -1,18 +1,109 @@
# MultiTask
# Patterns for Multiple Extraction
Defining a task and creating a list of classes is a common enough pattern that we define a helper function `MultiTask` that dynamically creates a new schema that has a task attribute defined as a list of the task subclass, including some prebuilt prompts and allows us to avoid writing some extra code.
A common use case of structured extraction is defining a single schema class and then making another schema to create a list to do multiple extraction
!!! example "Extending user details"
```python
class User(OpenAISchema):
name: str
age: int
Using the previous example with extracting `UserDetails` we might want to extract multiple users rather than a single user, `MultiTask` makes it easy!
class Users(OpenAISchema):
users: List[User]
```
```python
class UserDetails(OpenAISchema):
"""Details of a user"""
name: str = Field(..., description="users's full name")
age: int
MultiUserDetails = MultiTask(UserDetails)
Defining a task and creating a list of classes is a common enough pattern that we define a helper function `MultiTask` It procides a function to dynamically create a new class that:
1. Dynamic docstrings and class name baed on the task
2. Helper method to support streaming by collectin function_call tokens until a object back out.
## Extracting Tasks
By using multitask you get a very convient class with prompts and names automatically defined. You get `from_response` just like any other `OpenAISchema` you're able to extract the list of objects data you want with `MultTask.tasks`.
```python hl_lines="13"
from openai_function_call import OpenAISchema, MultiTask
class User(OpenAISchema):
name: str
age: int
MultiUser = MultiTask(User)
completion = openai.ChatCompletion.create(
model="gpt-4-0613",
temperature=0.1,
stream=False,
functions=[MultiUser.openai_schema],
function_call={"name": MultiUser.openai_schema["name"]},
messages=[
{
"role": "user",
"content": f"Consider the data below: Jason is 10 and John is 30",
},
],
max_tokens=1000,
)
MultiUser.from_response(completion)
```
```sh
{"tasks": [
{"name": "Jason", "age": 10},
{"name": "John", "age": 30}
]}
```
## Streaming Tasks
Since a `MultiTask(T)` is well contrained to `tasks: List[T]` we can make assuptions on how tokens are used and provide a helper method that allows you generate tasks as the the tokens are streamed in
!!! tips "Why would we want this?"
While `gpt-3.5-turbo` is quite fast `gpt-4` will take a while if there are many objects or if each object schema is complex. If 10 entities are created and takes 100ms to complete it would mean that it would take 1 second before we had access to our objects. With streaming you'd get the first object in 100ms a 10x percieved improvement in latency! While this may not make sense for more usecases if we were dynamitcally building UI based on entities, streaming entities 1 by 1 could improve the user experience dramatically.
Lets look at an example in action with the same class
```python hl_lines="6 26"
MultiUser = MultiTask(User)
completion = openai.ChatCompletion.create(
model="gpt-4-0613",
temperature=0.1,
stream=True,
functions=[MultiUser.openai_schema],
function_call={"name": MultiUser.openai_schema["name"]},
messages=[
{
"role": "system",
"content": "You are a perfect entity extraction system",
},
{
"role": "user",
"content": (
f"Consider the data below:\n{input}"
"Correctly segment it into entitites"
"Make sure the JSON is correct"
),
},
],
max_tokens=1000,
)
for user in MultiUser.from_streaming_response(completion):
assert isinstance(user, User)
print(user)
>>> name="Jason" "age"=10
>>> name="John" "age"=10
```
!!! usage "How??"
Consider this incomplete json string.
```json
{"tasks": [{"name": "Jason", "age": 10}
```
::: openai_function_call.dsl.multitask
Notice how, while this isn't valid json, we know that one complete `User` object was generated so we `yield` that object to be used elsewhere as soon as possible.
This streaming is still a prototype, but should work quite well for simple schemas.
@@ -0,0 +1,59 @@
from typing import Iterable
import openai
import time
from openai_function_call import MultiTask, OpenAISchema
class User(OpenAISchema):
name: str
job: str
age: int
def stream_extract(input: str, cls) -> Iterable[User]:
MultiUser = MultiTask(cls)
completion = openai.ChatCompletion.create(
model="gpt-4-0613",
temperature=0.1,
stream=True,
functions=[MultiUser.openai_schema],
function_call={"name": MultiUser.openai_schema["name"]},
messages=[
{
"role": "system",
"content": "You are a perfect entity extraction system",
},
{
"role": "user",
"content": (
f"Consider the data below:\n{input}"
"Correctly segment it into entitites"
"Make sure the JSON is correct"
),
},
],
max_tokens=1000,
)
return MultiUser.from_streaming_response(completion)
start = time.time()
for user in stream_extract(
input="Create 10 characters from the book Three Body Problem",
cls=User,
):
delay = (time.time() - start) * 100
print(f"{int(delay)} ms: User({user})")
"""
561 ms: User(name='Ye Wenjie' job='Astrophysicist' age=50)
713 ms: User(name='Wang Miao' job='Nanomaterials Researcher' age=40)
836 ms: User(name='Shi Qiang' job='Detective' age=45)
1001 ms: User(name='Ding Yi' job='Theoretical Physicist' age=42)
1136 ms: User(name='Chang Weisi' job='Major General' age=55)
1274 ms: User(name='Zhang Beihai' job='Space Force Naval Officer' age=52)
1499 ms: User(name='Luo Ji' job='Astronomer' age=48)
1612 ms: User(name='Wei Cheng' job='Mathematician' age=46)
1774 ms: User(name='Shen Yufei' job='Physicist' age=39)
1904 ms: User(name='Pan Han' job='Engineer' age=43)
"""
+3 -2
View File
@@ -44,7 +44,8 @@ markdown_extensions:
- admonition
nav:
- Introduction:
- Function Calls: 'index.md'
- OpenAISchema: 'index.md'
- MultiTask: "multitask.md"
- Philosophy: 'philosophy.md'
- Use Cases:
- 'Overview': 'examples/index.md'
@@ -56,7 +57,7 @@ nav:
- 'Creating multiple file programs': "examples/gpt-engineer.md"
- API Reference:
- 'OpenAISchema': 'openai_schema.md'
- "MultiTask Schema": "multitask.md"
- 'MultiTask': 'api_multitask.md'
- "Introduction: Writing Prompts": "writing-prompts.md"
- "Prompting Templates": "chat-completion.md"
extra:
+52 -3
View File
@@ -1,6 +1,50 @@
from pydantic import create_model, Field
from typing import Optional, List, Type
from ..function_calls import OpenAISchema
from openai_function_call import OpenAISchema
class MultiTaskBase:
task_type = None # type: ignore
@classmethod
def from_streaming_response(cls, completion):
json_chunks = cls.extract_json(completion)
yield from cls.tasks_from_chunks(json_chunks)
@classmethod
def tasks_from_chunks(cls, json_chunks):
started = False
potential_object = ""
for chunk in json_chunks:
potential_object += chunk
if not started:
if "[" in chunk:
started = True
potential_object = chunk[chunk.find("[") + 1 :]
continue
task_json, potential_object = cls.get_object(potential_object, 0)
if task_json:
obj = cls.task_type.model_validate_json(task_json) # type: ignore
yield obj
@staticmethod
def extract_json(completion):
for chunk in completion:
delta = chunk["choices"][0]["delta"]
if "function_call" in delta:
yield delta["function_call"]["arguments"]
@staticmethod
def get_object(str, stack):
for i, c in enumerate(str):
if c == "{":
stack += 1
if c == "}":
stack -= 1
if stack == 0:
return str[: i + 1], str[i + 2 :]
return None, str
def MultiTask(
@@ -30,7 +74,6 @@ def MultiTask(
)
```
Parameters:
subtask_class (Type[OpenAISchema]): The base class to use for the MultiTask
name (Optional[str]): The name of the MultiTask class, if None then the name
@@ -54,7 +97,13 @@ def MultiTask(
),
)
new_cls = create_model(name, tasks=list_tasks, __base__=(OpenAISchema,))
new_cls = create_model(
name,
tasks=list_tasks,
__base__=(OpenAISchema, MultiTaskBase),
)
# set the class constructor BaseModel
new_cls.task_type = subtask_class
new_cls.__doc__ = (
f"Correct segmentation of `{task_name}` tasks"