Add NeonClient2 package and update NeonClient import

This commit is contained in:
2024-01-17 15:51:42 -05:00
parent c13ea6b435
commit c2e42d62d6
14 changed files with 1584 additions and 204 deletions
+15 -6
View File
@@ -1,28 +1,37 @@
gen-model: fetch-v2-schema
datamodel-codegen \
--input v2.json \
--input-file-type jsonschema \
--collapse-root-models \
--output neon_client/openapi_models.py \
--output-model-type pydantic_v2.BaseModel \
--output neon_client/jsonschema.py \
--use-standard-collections \
--output-model-type pydantic_v2.BaseModel \
--input-file-type openapi \
--use-standard-collections \
# --use-standard-collections \
--use-union-operator \
--target-python-version 3.11 \
--use-schema-description \
--snake-case-field \
--enable-version-header \
--use-double-quotes \
--field-constraints \
--allow-population-by-field-name \
--use-title-as-name \
--reuse-model \
# --field-constraints \
--field-constraints \
--disable-appending-item-suffix \
--allow-extra-fields \
--use-annotated \
--capitalise-enum-members \
--use-unique-items-as-set
--use-unique-items-as-set \
--set-default-enum-member \
--enum-field-as-literal one \
--openapi-scopes {schemas,paths,tags,parameters} \
--use-operation-id-as-name \
# --field-constraints \
# --use-annotated \
fetch-v2-schema:
curl -O https://neon.tech/api_spec/release/v2.json
+1 -1
View File
@@ -1 +1 @@
from .client import NeonClient
from .client import NeonAPI
+103 -197
View File
@@ -1,219 +1,125 @@
from .http_client import Neon_API_V2
from .resources import ResourceCollection
import os
from collections.abc import Sequence
from . import models
import requests
from pydantic import BaseModel, ValidationError
from .jsonschema import (
ApiKeysListResponseItem,
ApiKeyCreateRequest,
ApiKeyCreateResponse,
ApiKeyRevokeResponse,
)
from .jsonschema import CurrentUserInfoResponse
class ItemView:
"""A view into a single item."""
__VERSION__ = "0.1.0"
def __init__(self, item, key_id=None):
self._item = item
self._key_id = key_id
NEON_API_KEY_ENVIRON = "NEON_API_KEY"
NEON_API_BASE_URL = "https://console.neon.tech/api/v2/"
class NeonClientException(requests.exceptions.HTTPError):
pass
class APIKey:
"""A Neon API key."""
def __init__(
self,
client,
obj,
data_model: BaseModel,
**kwargs,
):
"""A Neon API key.
Args:
client (NeonAPI): The Neon API client.
obj (dict): The API key data.
data_model (BaseModel, optional): The data model to use for deserialization. Defaults to None.
"""
self._client = client
self._data = obj
self._data_model = data_model
self.__cached_obj = None
@property
def item(self):
if self._key_id:
return getattr(self._item, self._key_id)
def obj(self):
if not self.__cached_obj:
self.__cached_obj = self._data_model(**self._data)
return self._item
def __getattr__(self, name):
return getattr(self.item, name)
def __setattr__(self, name, value):
if name == "_item":
return super().__setattr__(name, value)
return setattr(self.item, name, value)
def __str__(self):
return str(self.item)
return self.__cached_obj
def __repr__(self):
return repr(self.item)
return repr(self.obj)
def __eq__(self, other):
return self.item == other
@classmethod
def create(cls, client, key_name: str):
"""Create a new API key."""
def __ne__(self, other):
return self.item != other
obj = ApiKeyCreateRequest(key_name=key_name)
r = client.request("POST", "api_keys", json=obj.model_dump())
return cls(
client=client,
obj=r,
data_model=ApiKeyCreateResponse,
)
# ApiKeyCreateResponse(**r)
class CollectionView:
"""A view into a collection of items."""
def __init__(self, collection, key_ids=None, collection_id=None):
self.pagination = None
if not key_ids:
key_ids = []
self._key_ids = key_ids
if collection_id:
self._collection = getattr(collection, collection_id)
try:
self.pagination = collection.pagination
except AttributeError:
pass
else:
self._collection = collection
def __iter__(self):
return iter(self._collection)
def __getitem__(self, key):
for k in self._key_ids:
for item in self._collection:
if str(getattr(item, k)) == str(key):
return item
return self._collection[key]
def __len__(self):
return len(self._collection)
def __repr__(self):
return repr(self._collection)
def __str__(self):
return str(self._collection)
def __contains__(self, item):
return item in self._collection
def __eq__(self, other):
return self._collection == other
def __ne__(self, other):
return self._collection != other
class NeonClient:
def __init__(self, api_key: str, **kwargs):
self.api = Neon_API_V2(api_key, **kwargs)
self.resources = ResourceCollection(self.api)
def me(self):
return self.resources.users.get_current_user_info()
def api_keys(self):
@classmethod
def list(cls, client):
"""Get a list of API keys."""
return CollectionView(self.resources.api_keys.get_list(), key_ids=["id"])
r = client.request("GET", "api_keys")
return [
cls(client=client, obj=x, data_model=ApiKeysListResponseItem) for x in r
]
def projects(self, shared=False, **kwargs):
"""Get a list of projects."""
@classmethod
def revoke(cls, client, api_key):
"""Revoke an API key."""
a = self.resources.projects.get_list(shared=shared, **kwargs)
print(a)
exit()
r = client.request("DELETE", f"api_keys/{ api_key.obj.id }")
return cls(client=client, obj=r, data_model=ApiKeyRevokeResponse)
return CollectionView(
self.resources.projects.get_list(shared=shared, **kwargs),
key_ids=["id", "name"],
collection_id="projects",
class NeonAPI:
def __init__(self, api_key, *, base_url=None):
if not base_url:
base_url = NEON_API_BASE_URL
# Private attributes.
self._api_key = api_key
self._session = requests.Session()
# Public attributes.
self.base_url = base_url
self.user_agent = f"neon-client/{__VERSION__}"
def request(self, method, path, **kwargs):
"""Send an HTTP request to the specified path using the specified method."""
# Set HTTP headers for outgoing requests.
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self._api_key}"
headers["Accept"] = "application/json"
headers["Content-Type"] = "application/json"
headers["User-Agent"] = self.user_agent
r = self._session.request(
method, self.base_url + path, headers=headers, **kwargs
)
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
raise NeonClientException(r.text)
def project(self, project_id: str, **kwargs):
"""Get a single project."""
return r.json()
return ItemView(
self.resources.projects.get(project_id, **kwargs), key_id="project"
)
@classmethod
def from_environ(cls):
"""Create a new Neon API client from the NEON_API_KEY environment variable."""
def project_create(self, **kwargs):
return ItemView(self.resources.projects.create(**kwargs), key_id="project")
def project_delete(self, project_id: str, **kwargs):
return ItemView(
self.resources.projects.delete(project_id, **kwargs), key_id="project"
)
def databases(self, project_id: str, branch_id: str, **kwargs):
return CollectionView(
self.resources.databases.get_list(project_id, branch_id, **kwargs),
key_ids=["id"],
collection_id="databases",
)
def database(self, project_id: str, database_id: str, **kwargs):
return ItemView(
self.resources.databases.get(project_id, database_id, **kwargs),
key_id="database",
)
def branches(self, project_id: str, **kwargs):
return CollectionView(
self.resources.branches.get_list(project_id, **kwargs),
key_ids=["id", "name"],
collection_id="branches",
)
def branch(self, project_id: str, branch_id: str):
return ItemView(
self.resources.branches.get(project_id, branch_id),
key_id="branch",
)
def branch_create(self, project_id: str, **kwargs):
return ItemView(
self.resources.branches.create(project_id, **kwargs),
key_id="branch",
)
def branch_delete(self, project_id: str, branch_id: str, **kwargs):
return ItemView(
self.resources.branches.delete(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_update(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.update(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_rename(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.rename(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_add_compute(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.add_compute(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_remove_compute(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.remove_compute(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_set_primary(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.set_primary(project_id, branch_id, **kwargs),
key_id="branch",
)
def get_connection_string(self, project_id: str, branch_id: str, database_id: str):
# TODO: implement this.
return self.resources.databases.get_connection_string(
project_id,
branch_id,
database_id,
)
# def branch_create(self, project_id: str, **kwargs):
# return ItemView(
# self.resources.branches.create(project_id, **kwargs),
# key_id="branch",
# )
return cls(os.environ[NEON_API_KEY_ENVIRON])
File diff suppressed because it is too large Load Diff
+2
View File
@@ -0,0 +1,2 @@
# from .client import NeonClient
from .simple import NeonClient
+1
View File
@@ -0,0 +1 @@
__version__ = "0.1.0"
+219
View File
@@ -0,0 +1,219 @@
from .http_client import Neon_API_V2
from .resources import ResourceCollection
from . import models
class ItemView:
"""A view into a single item."""
def __init__(self, item, key_id=None):
self._item = item
self._key_id = key_id
@property
def item(self):
if self._key_id:
return getattr(self._item, self._key_id)
return self._item
def __getattr__(self, name):
return getattr(self.item, name)
def __setattr__(self, name, value):
if name == "_item":
return super().__setattr__(name, value)
return setattr(self.item, name, value)
def __str__(self):
return str(self.item)
def __repr__(self):
return repr(self.item)
def __eq__(self, other):
return self.item == other
def __ne__(self, other):
return self.item != other
class CollectionView:
"""A view into a collection of items."""
def __init__(self, collection, key_ids=None, collection_id=None):
self.pagination = None
if not key_ids:
key_ids = []
self._key_ids = key_ids
if collection_id:
self._collection = getattr(collection, collection_id)
try:
self.pagination = collection.pagination
except AttributeError:
pass
else:
self._collection = collection
def __iter__(self):
return iter(self._collection)
def __getitem__(self, key):
for k in self._key_ids:
for item in self._collection:
if str(getattr(item, k)) == str(key):
return item
return self._collection[key]
def __len__(self):
return len(self._collection)
def __repr__(self):
return repr(self._collection)
def __str__(self):
return repr(self._collection)
def __contains__(self, item):
return item in self._collection
def __eq__(self, other):
return self._collection == other
def __ne__(self, other):
return self._collection != other
class NeonClient:
def __init__(self, api_key: str, **kwargs):
self.api = Neon_API_V2(api_key, **kwargs)
self.resources = ResourceCollection(self.api)
def me(self):
return self.resources.users.get_current_user_info()
def api_keys(self):
"""Get a list of API keys."""
return CollectionView(self.resources.api_keys.get_list(), key_ids=["id"])
def projects(self, shared=False, **kwargs):
"""Get a list of projects."""
a = self.resources.projects.get_list(shared=shared, **kwargs)
print(a)
exit()
return CollectionView(
self.resources.projects.get_list(shared=shared, **kwargs),
key_ids=["id", "name"],
collection_id="projects",
)
def project(self, project_id: str, **kwargs):
"""Get a single project."""
return ItemView(
self.resources.projects.get(project_id, **kwargs), key_id="project"
)
def project_create(self, **kwargs):
return ItemView(self.resources.projects.create(**kwargs), key_id="project")
def project_delete(self, project_id: str, **kwargs):
return ItemView(
self.resources.projects.delete(project_id, **kwargs), key_id="projects"
)
def databases(self, project_id: str, branch_id: str, **kwargs):
return CollectionView(
self.resources.databases.get_list(project_id, branch_id, **kwargs),
key_ids=["id"],
collection_id="databases",
)
def database(self, project_id: str, database_id: str, **kwargs):
return ItemView(
self.resources.databases.get(project_id, database_id, **kwargs),
key_id="database",
)
def branches(self, project_id: str, **kwargs):
return CollectionView(
self.resources.branches.get_list(project_id, **kwargs),
key_ids=["id", "name"],
collection_id="branches",
)
def branch(self, project_id: str, branch_id: str):
return ItemView(
self.resources.branches.get(project_id, branch_id),
key_id="branch",
)
def branch_create(self, project_id: str, **kwargs):
return ItemView(
self.resources.branches.create(project_id, **kwargs),
key_id="branch",
)
def branch_delete(self, project_id: str, branch_id: str, **kwargs):
return ItemView(
self.resources.branches.delete(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_update(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.update(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_rename(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.rename(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_add_compute(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.add_compute(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_remove_compute(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.remove_compute(project_id, branch_id, **kwargs),
key_id="branch",
)
def branch_set_primary(self, project_id: str, branch_id: str, **kwargs):
# TODO: untested.
return ItemView(
self.resources.branches.set_primary(project_id, branch_id, **kwargs),
key_id="branch",
)
def get_connection_string(self, project_id: str, branch_id: str, database_id: str):
# TODO: implement this.
return self.resources.databases.get_connection_string(
project_id,
branch_id,
database_id,
)
# def branch_create(self, project_id: str, **kwargs):
# return ItemView(
# self.resources.branches.create(project_id, **kwargs),
# key_id="branch",
# )
+156
View File
@@ -0,0 +1,156 @@
import os
import requests
from requests.exceptions import HTTPError
from pydantic import BaseModel
from fastapi.encoders import jsonable_encoder
from . import models
from .client import ItemView, CollectionView
from .exceptions import NeonClientException
from .__version__ import __version__
from .resources import PagedOperationsResponse, PagedProjectsResponse
from .utils import squash
NEON_ENVIRON_NAME = "NEON_API_KEY"
class NeonClient:
def __init__(
self, api_key: str, *, base_url: str = "https://console.neon.tech/api/v2/"
):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.user_agent = f"neon-client/{__version__}"
@classmethod
def from_environ(cls, environ=os.environ.copy(), **kwargs):
"""Create a new NeonClient instance from environment variables.
Args:
environ (dict, optional): The environment variables to use. Defaults to os.environ.copy().
**kwargs: Additional keyword arguments to be passed to the NeonClient constructor.
Returns:
NeonClient: The new NeonClient instance.
"""
return cls(environ[NEON_ENVIRON_NAME], **kwargs)
def url_join(self, *args):
"""Join URL path segments.
Args:
*args: The path segments to join.
Returns:
str: The joined path.
"""
return "/".join(args)
def request(
self,
method: str,
path: str,
data: dict | None = None,
*,
request_model: BaseModel | None = None,
response_model: BaseModel | None = None,
response_is_array=False,
check_status_code=True,
_debug=False,
**kwargs,
):
"""
Sends an HTTP request to the specified path using the specified method.
Args:
method (str): The HTTP method to use for the request.
path (str): The path to send the request to.
request_model (BaseModel, optional): The model to serialize the request body from. Defaults to None.
response_model (BaseModel, optional): The model to deserialize the response into. Defaults to None.
response_is_array (bool or str, optional): Indicates whether the response is a list of items. If a string is provided, it is used as the key to access the list in the response JSON. Defaults to False.
check_status_code (bool, optional): Indicates whether to check the status code of the response and raise an exception if it is not successful. Defaults to True.
**kwargs: Additional keyword arguments to be passed to the request.
Returns:
The deserialized response if a response model is provided, otherwise the raw response object.
"""
# Set HTTP headers for outgoing requests.
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.api_key}"
headers["Accept"] = "application/json"
headers["Content-Type"] = "application/json"
headers["User-Agent"] = self.user_agent
r = self.session.request(
method, self.base_url + path, headers=headers, **kwargs
)
if check_status_code:
# TODO: add custom exception classes here.
try:
r.raise_for_status()
except requests.exceptions.HTTPError:
raise NeonClientException(r.text)
if response_model:
if response_is_array:
# Shortcut for when the response is a list of items.
if type(response_is_array) == "str":
response_parsed = [
response_model.model_construct(**item)
for item in r.json()[response_is_array]
]
elif response_is_array is True:
response_parsed = [
response_model.model_construct(**item) for item in r.json()
]
else:
response_parsed = response_model.model_construct(**r.json())
if _debug:
print(r, r.json())
return response_parsed
else:
return r
def me(self):
return self.request(
method="GET",
path=self.url_join("users", "me"),
response_model=models.CurrentUserInfoResponse,
)
def api_keys(self):
"""Get a list of API keys."""
r = self.request(
method="GET",
path="api_keys",
response_model=models.ApiKeysListResponseItem,
response_is_array=True,
_debug=True,
)
return CollectionView(r, key_ids=["id", "name"])
def projects(
self,
*,
cursor: int | None = None,
limit: int | None = None,
shared: bool = False,
):
"""Get a list of projects."""
# Pagination support.
projects_params = squash({"cursor": cursor, "limit": limit})
r = self.request(
method="GET",
path=(self.api.url_join("projects", "shared") if shared else "projects"),
params=projects_params,
response_model=PagedProjectsResponse,
)
return CollectionView(r, key_ids=["id", "name"], collection_id="projects")
@@ -3,6 +3,16 @@ from typing import List, Dict, Any, Union, Optional
from pydantic import BaseModel
def squash(obj):
new_obj = {}
for k, v in obj.items():
if isinstance(v, dict):
new_obj.update(squash(v))
else:
if v:
new_obj[k] = v
def validate_obj_model(parameter_name: str, model: BaseModel):
"""A decorator that validates the 'obj' argument against the specified model."""