classConnectionComponents(BaseModel):""" Parameters to use to create a SQLAlchemy engine URL. Attributes: driver: The driver name to use. database: The name of the database to use. username: The user name used to authenticate. password: The password used to authenticate. host: The host address of the database. port: The port to connect to the database. query: A dictionary of string keys to string values to be passed to the dialect and/or the DBAPI upon connect. """driver:Union[AsyncDriver,SyncDriver,str]=Field(default=...,description="The driver name to use.")database:str=Field(default=...,description="The name of the database to use.")username:Optional[str]=Field(default=None,description="The user name used to authenticate.")password:Optional[SecretStr]=Field(default=None,description="The password used to authenticate.")host:Optional[str]=Field(default=None,description="The host address of the database.")port:Optional[str]=Field(default=None,description="The port to connect to the database.")query:Optional[Dict[str,str]]=Field(default=None,description=("A dictionary of string keys to string values to be passed to the dialect ""and/or the DBAPI upon connect. To specify non-string parameters to a ""Python DBAPI directly, use connect_args."),)defcreate_url(self)->URL:""" Create a fully formed connection URL. Returns: The SQLAlchemy engine URL. """driver=self.driverdrivername=driver.valueifisinstance(driver,Enum)elsedriverpassword=self.password.get_secret_value()ifself.passwordelseNoneurl_params=dict(drivername=drivername,username=self.username,password=password,database=self.database,host=self.host,port=self.port,query=self.query,)returnURL.create(**{url_key:url_paramforurl_key,url_paraminurl_params.items()ifurl_paramisnotNone})
defcreate_url(self)->URL:""" Create a fully formed connection URL. Returns: The SQLAlchemy engine URL. """driver=self.driverdrivername=driver.valueifisinstance(driver,Enum)elsedriverpassword=self.password.get_secret_value()ifself.passwordelseNoneurl_params=dict(drivername=drivername,username=self.username,password=password,database=self.database,host=self.host,port=self.port,query=self.query,)returnURL.create(**{url_key:url_paramforurl_key,url_paraminurl_params.items()ifurl_paramisnotNone})
A dictionary of string keys to string values to be passed to
the dialect and/or the DBAPI upon connect. To specify non-string
parameters to a Python DBAPI directly, use connect_args.
url
Optional[AnyUrl]
Manually create and provide a URL to create the engine,
this is useful for external dialects, e.g. Snowflake, because some
of the params, such as "warehouse", is not directly supported in
the vanilla sqlalchemy.engine.URL.create method; do not provide
this alongside with other URL params as it will raise a ValueError.
connect_args
Optional[Dict[str, Any]]
The options which will be passed directly to the
DBAPI's connect() method as additional keyword arguments.
classDatabaseCredentials(Block):""" Block used to manage authentication with a database. Attributes: driver: The driver name, e.g. "postgresql+asyncpg" database: The name of the database to use. username: The user name used to authenticate. password: The password used to authenticate. host: The host address of the database. port: The port to connect to the database. query: A dictionary of string keys to string values to be passed to the dialect and/or the DBAPI upon connect. To specify non-string parameters to a Python DBAPI directly, use connect_args. url: Manually create and provide a URL to create the engine, this is useful for external dialects, e.g. Snowflake, because some of the params, such as "warehouse", is not directly supported in the vanilla `sqlalchemy.engine.URL.create` method; do not provide this alongside with other URL params as it will raise a `ValueError`. connect_args: The options which will be passed directly to the DBAPI's connect() method as additional keyword arguments. Example: Load stored database credentials: ```python from prefect_sqlalchemy import DatabaseCredentials database_block = DatabaseCredentials.load("BLOCK_NAME") ``` """_block_type_name="Database Credentials"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/fb3f4debabcda1c5a3aeea4f5b3f94c28845e23e-250x250.png"# noqa_documentation_url="https://prefecthq.github.io/prefect-sqlalchemy/credentials/#prefect_sqlalchemy.credentials.DatabaseCredentials"# noqadriver:Optional[Union[AsyncDriver,SyncDriver,str]]=Field(default=None,description="The driver name to use.")username:Optional[str]=Field(default=None,description="The user name used to authenticate.")password:Optional[SecretStr]=Field(default=None,description="The password used to authenticate.")database:Optional[str]=Field(default=None,description="The name of the database to use.")host:Optional[str]=Field(default=None,description="The host address of the database.")port:Optional[str]=Field(default=None,description="The port to connect to the database.")query:Optional[Dict[str,str]]=Field(default=None,description=("A dictionary of string keys to string values to be passed to the dialect ""and/or the DBAPI upon connect. To specify non-string parameters to a ""Python DBAPI directly, use connect_args."),)url:Optional[AnyUrl]=Field(default=None,description=("Manually create and provide a URL to create the engine, this is useful ""for external dialects, e.g. Snowflake, because some of the params, ""such as 'warehouse', is not directly supported in the vanilla ""`sqlalchemy.engine.URL.create` method; do not provide this ""alongside with other URL params as it will raise a `ValueError`."),)connect_args:Optional[Dict[str,Any]]=Field(default=None,description=("The options which will be passed directly to the DBAPI's connect() ""method as additional keyword arguments."),)defblock_initialization(self):""" Initializes the engine. """warnings.warn("DatabaseCredentials is now deprecated and will be removed March 2023; ""please use SqlAlchemyConnector instead.",DeprecationWarning,)ifisinstance(self.driver,AsyncDriver):drivername=self.driver.valueself._driver_is_async=Trueelifisinstance(self.driver,SyncDriver):drivername=self.driver.valueself._driver_is_async=Falseelse:drivername=self.driverself._driver_is_async=drivernameinAsyncDriver._value2member_map_url_params=dict(drivername=drivername,username=self.username,password=self.password.get_secret_value()ifself.passwordelseNone,database=self.database,host=self.host,port=self.port,query=self.query,)ifnotself.url:required_url_keys=("drivername","database")ifnotall(url_params[key]forkeyinrequired_url_keys):required_url_keys=("driver","database")raiseValueError(f"If the `url` is not provided, "f"all of these URL params are required: "f"{required_url_keys}")self.rendered_url=URL.create(**{url_key:url_paramforurl_key,url_paraminurl_params.items()ifurl_paramisnotNone})# from paramselse:ifany(valforvalinurl_params.values()):raiseValueError(f"The `url` should not be provided "f"alongside any of these URL params: "f"{url_params.keys()}")self.rendered_url=make_url(str(self.url))defget_engine(self)->Union["Connection","AsyncConnection"]:""" Returns an authenticated engine that can be used to query from databases. Returns: The authenticated SQLAlchemy Connection / AsyncConnection. Examples: Create an asynchronous engine to PostgreSQL using URL params. ```python from prefect import flow from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver @flow def sqlalchemy_credentials_flow(): sqlalchemy_credentials = DatabaseCredentials( driver=AsyncDriver.POSTGRESQL_ASYNCPG, username="prefect", password="prefect_password", database="postgres" ) print(sqlalchemy_credentials.get_engine()) sqlalchemy_credentials_flow() ``` Create a synchronous engine to Snowflake using the `url` kwarg. ```python from prefect import flow from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver @flow def sqlalchemy_credentials_flow(): url = ( "snowflake://<user_login_name>:<password>" "@<account_identifier>/<database_name>" "?warehouse=<warehouse_name>" ) sqlalchemy_credentials = DatabaseCredentials(url=url) print(sqlalchemy_credentials.get_engine()) sqlalchemy_credentials_flow() ``` """engine_kwargs=dict(url=self.rendered_url,connect_args=self.connect_argsor{},poolclass=NullPool,)ifself._driver_is_async:engine=create_async_engine(**engine_kwargs)else:engine=create_engine(**engine_kwargs)returnengineclassConfig:"""Configuration of pydantic."""# Support serialization of the 'URL' typearbitrary_types_allowed=Truejson_encoders={URL:lambdau:u.render_as_string()}defdict(self,*args,**kwargs)->Dict:""" Convert to a dictionary. """# Support serialization of the 'URL' typed=super().dict(*args,**kwargs)d["rendered_url"]=SecretStr(self.rendered_url.render_as_string(hide_password=False))returnd
classConfig:"""Configuration of pydantic."""# Support serialization of the 'URL' typearbitrary_types_allowed=Truejson_encoders={URL:lambdau:u.render_as_string()}
defblock_initialization(self):""" Initializes the engine. """warnings.warn("DatabaseCredentials is now deprecated and will be removed March 2023; ""please use SqlAlchemyConnector instead.",DeprecationWarning,)ifisinstance(self.driver,AsyncDriver):drivername=self.driver.valueself._driver_is_async=Trueelifisinstance(self.driver,SyncDriver):drivername=self.driver.valueself._driver_is_async=Falseelse:drivername=self.driverself._driver_is_async=drivernameinAsyncDriver._value2member_map_url_params=dict(drivername=drivername,username=self.username,password=self.password.get_secret_value()ifself.passwordelseNone,database=self.database,host=self.host,port=self.port,query=self.query,)ifnotself.url:required_url_keys=("drivername","database")ifnotall(url_params[key]forkeyinrequired_url_keys):required_url_keys=("driver","database")raiseValueError(f"If the `url` is not provided, "f"all of these URL params are required: "f"{required_url_keys}")self.rendered_url=URL.create(**{url_key:url_paramforurl_key,url_paraminurl_params.items()ifurl_paramisnotNone})# from paramselse:ifany(valforvalinurl_params.values()):raiseValueError(f"The `url` should not be provided "f"alongside any of these URL params: "f"{url_params.keys()}")self.rendered_url=make_url(str(self.url))
defdict(self,*args,**kwargs)->Dict:""" Convert to a dictionary. """# Support serialization of the 'URL' typed=super().dict(*args,**kwargs)d["rendered_url"]=SecretStr(self.rendered_url.render_as_string(hide_password=False))returnd
defget_engine(self)->Union["Connection","AsyncConnection"]:""" Returns an authenticated engine that can be used to query from databases. Returns: The authenticated SQLAlchemy Connection / AsyncConnection. Examples: Create an asynchronous engine to PostgreSQL using URL params. ```python from prefect import flow from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver @flow def sqlalchemy_credentials_flow(): sqlalchemy_credentials = DatabaseCredentials( driver=AsyncDriver.POSTGRESQL_ASYNCPG, username="prefect", password="prefect_password", database="postgres" ) print(sqlalchemy_credentials.get_engine()) sqlalchemy_credentials_flow() ``` Create a synchronous engine to Snowflake using the `url` kwarg. ```python from prefect import flow from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver @flow def sqlalchemy_credentials_flow(): url = ( "snowflake://<user_login_name>:<password>" "@<account_identifier>/<database_name>" "?warehouse=<warehouse_name>" ) sqlalchemy_credentials = DatabaseCredentials(url=url) print(sqlalchemy_credentials.get_engine()) sqlalchemy_credentials_flow() ``` """engine_kwargs=dict(url=self.rendered_url,connect_args=self.connect_argsor{},poolclass=NullPool,)ifself._driver_is_async:engine=create_async_engine(**engine_kwargs)else:engine=create_engine(**engine_kwargs)returnengine