diff --git a/langchain/document_loaders/apify_dataset.py b/langchain/document_loaders/apify_dataset.py index e8d130597..469ae773c 100644 --- a/langchain/document_loaders/apify_dataset.py +++ b/langchain/document_loaders/apify_dataset.py @@ -50,5 +50,7 @@ class ApifyDatasetLoader(BaseLoader, BaseModel): def load(self) -> List[Document]: """Load documents.""" - dataset_items = self.apify_client.dataset(self.dataset_id).list_items().items + dataset_items = ( + self.apify_client.dataset(self.dataset_id).list_items(clean=True).items + ) return list(map(self.dataset_mapping_function, dataset_items)) diff --git a/langchain/utilities/apify.py b/langchain/utilities/apify.py index bf1527f1c..c426ee8b2 100644 --- a/langchain/utilities/apify.py +++ b/langchain/utilities/apify.py @@ -121,3 +121,85 @@ class ApifyWrapper(BaseModel): dataset_id=actor_call["defaultDatasetId"], dataset_mapping_function=dataset_mapping_function, ) + + def call_actor_task( + self, + task_id: str, + task_input: Dict, + dataset_mapping_function: Callable[[Dict], Document], + *, + build: Optional[str] = None, + memory_mbytes: Optional[int] = None, + timeout_secs: Optional[int] = None, + ) -> ApifyDatasetLoader: + """Run a saved Actor task on Apify and wait for results to be ready. + + Args: + task_id (str): The ID or name of the task on the Apify platform. + task_input (Dict): The input object of the task that you're trying to run. + Overrides the task's saved input. + dataset_mapping_function (Callable): A function that takes a single + dictionary (an Apify dataset item) and converts it to an + instance of the Document class. + build (str, optional): Optionally specifies the actor build to run. + It can be either a build tag or build number. + memory_mbytes (int, optional): Optional memory limit for the run, + in megabytes. + timeout_secs (int, optional): Optional timeout for the run, in seconds. + + Returns: + ApifyDatasetLoader: A loader that will fetch the records from the + task run's default dataset. + """ + task_call = self.apify_client.task(task_id).call( + task_input=task_input, + build=build, + memory_mbytes=memory_mbytes, + timeout_secs=timeout_secs, + ) + + return ApifyDatasetLoader( + dataset_id=task_call["defaultDatasetId"], + dataset_mapping_function=dataset_mapping_function, + ) + + async def acall_actor_task( + self, + task_id: str, + task_input: Dict, + dataset_mapping_function: Callable[[Dict], Document], + *, + build: Optional[str] = None, + memory_mbytes: Optional[int] = None, + timeout_secs: Optional[int] = None, + ) -> ApifyDatasetLoader: + """Run a saved Actor task on Apify and wait for results to be ready. + + Args: + task_id (str): The ID or name of the task on the Apify platform. + task_input (Dict): The input object of the task that you're trying to run. + Overrides the task's saved input. + dataset_mapping_function (Callable): A function that takes a single + dictionary (an Apify dataset item) and converts it to an + instance of the Document class. + build (str, optional): Optionally specifies the actor build to run. + It can be either a build tag or build number. + memory_mbytes (int, optional): Optional memory limit for the run, + in megabytes. + timeout_secs (int, optional): Optional timeout for the run, in seconds. + + Returns: + ApifyDatasetLoader: A loader that will fetch the records from the + task run's default dataset. + """ + task_call = await self.apify_client_async.task(task_id).call( + task_input=task_input, + build=build, + memory_mbytes=memory_mbytes, + timeout_secs=timeout_secs, + ) + + return ApifyDatasetLoader( + dataset_id=task_call["defaultDatasetId"], + dataset_mapping_function=dataset_mapping_function, + )