@taskasyncdefml_get_datastore(ml_credentials:"AzureMlCredentials",datastore_name:str=None)->Datastore:""" Gets the Datastore within the Workspace. Args: ml_credentials: Credentials to use for authentication with Azure. datastore_name: The name of the Datastore. If `None`, then the default Datastore of the Workspace is returned. Example: Get Datastore object ```python from prefect import flow from prefect_azure import AzureMlCredentials from prefect_azure.ml_datastore import ml_get_datastore @flow def example_ml_get_datastore_flow(): ml_credentials = AzureMlCredentials( tenant_id="tenant_id", service_principal_id="service_principal_id", service_principal_password="service_principal_password", subscription_id="subscription_id", resource_group="resource_group", workspace_name="workspace_name", ) results = ml_get_datastore(ml_credentials, datastore_name="datastore_name") return results ``` """logger=get_run_logger()logger.info("Getting datastore %s",datastore_name)result=await_get_datastore(ml_credentials,datastore_name)returnresult
@taskdefml_list_datastores(ml_credentials:"AzureMlCredentials")->Dict:""" Lists the Datastores in the Workspace. Args: ml_credentials: Credentials to use for authentication with Azure. Example: List Datastore objects ```python from prefect import flow from prefect_azure import AzureMlCredentials from prefect_azure.ml_datastore import ml_list_datastores @flow def example_ml_list_datastores_flow(): ml_credentials = AzureMlCredentials( tenant_id="tenant_id", service_principal_id="service_principal_id", service_principal_password="service_principal_password", subscription_id="subscription_id", resource_group="resource_group", workspace_name="workspace_name", ) results = ml_list_datastores(ml_credentials) return results ``` """logger=get_run_logger()logger.info("Listing datastores")workspace=ml_credentials.get_workspace()results=workspace.datastoresreturnresults
@taskasyncdefml_register_datastore_blob_container(container_name:str,ml_credentials:"AzureMlCredentials",blob_storage_credentials:"AzureBlobStorageCredentials",datastore_name:str=None,create_container_if_not_exists:bool=False,overwrite:bool=False,set_as_default:bool=False,)->"AzureBlobDatastore":""" Registers a Azure Blob Storage container as a Datastore in a Azure ML service Workspace. Args: container_name: The name of the container. ml_credentials: Credentials to use for authentication with Azure ML. blob_storage_credentials: Credentials to use for authentication with Azure Blob Storage. datastore_name: The name of the datastore. If not defined, the container name will be used. create_container_if_not_exists: Create a container, if one does not exist with the given name. overwrite: Overwrite an existing datastore. If the datastore does not exist, it will be created. set_as_default: Set the created Datastore as the default datastore for the Workspace. Example: Upload Datastore object ```python from prefect import flow from prefect_azure import AzureMlCredentials from prefect_azure.ml_datastore import ml_register_datastore_blob_container @flow def example_ml_register_datastore_blob_container_flow(): ml_credentials = AzureMlCredentials( tenant_id="tenant_id", service_principal_id="service_principal_id", service_principal_password="service_principal_password", subscription_id="subscription_id", resource_group="resource_group", workspace_name="workspace_name", ) blob_storage_credentials = AzureBlobStorageCredentials("connection_string") result = ml_register_datastore_blob_container( "container", ml_credentials, blob_storage_credentials, datastore_name="datastore_name" ) return result ``` """logger=get_run_logger()ifdatastore_nameisNone:datastore_name=container_namelogger.info("Registering %s container into %s datastore",container_name,datastore_name)workspace=ml_credentials.get_workspace()asyncwithblob_storage_credentials.get_client()asblob_service_client:credential=blob_service_client.credentialaccount_name=credential.account_nameaccount_key=credential.account_keypartial_register=partial(Datastore.register_azure_blob_container,workspace=workspace,datastore_name=datastore_name,container_name=container_name,account_name=account_name,account_key=account_key,overwrite=overwrite,create_if_not_exists=create_container_if_not_exists,)result=awaitto_thread.run_sync(partial_register)ifset_as_default:result.set_as_default()returnresult
The location in the blob container to upload to. If
None, then upload to root.
None
relative_root
Union[str, Path]
The root from which is used to determine the path of
the files in the blob. For example, if we upload /path/to/file.txt,
and we define base path to be /path, when file.txt is uploaded
to the blob storage, it will have the path of /to/file.txt.
None
datastore_name
str
The name of the Datastore. If None, then the
default Datastore of the Workspace is returned.
@taskasyncdefml_upload_datastore(path:Union[str,Path,List[Union[str,Path]]],ml_credentials:"AzureMlCredentials",target_path:Union[str,Path]=None,relative_root:Union[str,Path]=None,datastore_name:str=None,overwrite:bool=False,)->"DataReference":""" Uploads local files to a Datastore. Args: path: The path to a single file, single directory, or a list of path to files to be uploaded. ml_credentials: Credentials to use for authentication with Azure. target_path: The location in the blob container to upload to. If None, then upload to root. relative_root: The root from which is used to determine the path of the files in the blob. For example, if we upload /path/to/file.txt, and we define base path to be /path, when file.txt is uploaded to the blob storage, it will have the path of /to/file.txt. datastore_name: The name of the Datastore. If `None`, then the default Datastore of the Workspace is returned. overwrite: Overwrite existing file(s). Example: Upload Datastore object ```python from prefect import flow from prefect_azure import AzureMlCredentials from prefect_azure.ml_datastore import ml_upload_datastore @flow def example_ml_upload_datastore_flow(): ml_credentials = AzureMlCredentials( tenant_id="tenant_id", service_principal_id="service_principal_id", service_principal_password="service_principal_password", subscription_id="subscription_id", resource_group="resource_group", workspace_name="workspace_name", ) result = ml_upload_datastore( "path/to/dir/or/file", ml_credentials, datastore_name="datastore_name" ) return result ``` """logger=get_run_logger()logger.info("Uploading %s into %s datastore",path,datastore_name)datastore=await_get_datastore(ml_credentials,datastore_name)ifisinstance(path,Path):path=str(path)elifisinstance(path,list)andisinstance(path[0],Path):path=[str(p)forpinpath]ifisinstance(target_path,Path):target_path=str(target_path)ifisinstance(relative_root,Path):relative_root=str(relative_root)ifisinstance(path,str)andos.path.isdir(path):partial_upload=partial(datastore.upload,src_dir=path,target_path=target_path,overwrite=overwrite,show_progress=False,)else:partial_upload=partial(datastore.upload_files,files=pathifisinstance(path,list)else[path],relative_root=relative_root,target_path=target_path,overwrite=overwrite,show_progress=False,)result=awaitto_thread.run_sync(partial_upload)returnresult