classBlockStorageAdapter:""" A storage adapter for a storage block object to allow it to be used as a runner storage object. """def__init__(self,block:Union[ReadableDeploymentStorage,WritableDeploymentStorage],pull_interval:Optional[int]=60,):self._block=blockself._pull_interval=pull_intervalself._storage_base_path=Path.cwd()ifnotisinstance(block,Block):raiseTypeError(f"Expected a block object. Received a {type(block).__name__!r} object.")ifnothasattr(block,"get_directory"):raiseValueError("Provided block must have a `get_directory` method.")self._name=(f"{block.get_block_type_slug()}-{block._block_document_name}"ifblock._block_document_nameelsestr(uuid4()))defset_base_path(self,path:Path):self._storage_base_path=path@propertydefpull_interval(self)->Optional[int]:returnself._pull_interval@propertydefdestination(self)->Path:returnself._storage_base_path/self._nameasyncdefpull_code(self):ifnotself.destination.exists():self.destination.mkdir(parents=True,exist_ok=True)awaitself._block.get_directory(local_path=str(self.destination))defto_pull_step(self)->dict:# Give blocks the change to implement their own pull stepifhasattr(self._block,"get_pull_step"):returnself._block.get_pull_step()else:ifnotself._block._block_document_name:raiseBlockNotSavedError("Block must be saved with `.save()` before it can be converted to a"" pull step.")return{"prefect.deployments.steps.pull_with_block":{"block_type_slug":self._block.get_block_type_slug(),"block_document_name":self._block._block_document_name,}}def__eq__(self,__value)->bool:ifisinstance(__value,BlockStorageAdapter):returnself._block==__value._blockreturnFalse
classGitRepository:""" Pulls the contents of a git repository to the local filesystem. Parameters: url: The URL of the git repository to pull from credentials: A dictionary of credentials to use when pulling from the repository. If a username is provided, an access token must also be provided. name: The name of the repository. If not provided, the name will be inferred from the repository URL. branch: The branch to pull from. Defaults to "main". pull_interval: The interval in seconds at which to pull contents from remote storage to local storage. If None, remote storage will perform a one-time sync. Examples: Pull the contents of a private git repository to the local filesystem: ```python from prefect.runner.storage import GitRepository storage = GitRepository( url="https://github.com/org/repo.git", credentials={"username": "oauth2", "access_token": "my-access-token"}, ) await storage.pull_code() ``` """def__init__(self,url:str,credentials:Union[GitCredentials,Block,Dict[str,Any],None]=None,name:Optional[str]=None,branch:Optional[str]=None,include_submodules:bool=False,pull_interval:Optional[int]=60,):ifcredentialsisNone:credentials={}if(isinstance(credentials,dict)andcredentials.get("username")andnot(credentials.get("access_token")orcredentials.get("password"))):raiseValueError("If a username is provided, an access token or password must also be"" provided.")self._url=urlself._branch=branchself._credentials=credentialsself._include_submodules=include_submodulesrepo_name=urlparse(url).path.split("/")[-1].replace(".git","")default_name=f"{repo_name}-{branch}"ifbranchelserepo_nameself._name=nameordefault_nameself._logger=get_logger(f"runner.storage.git-repository.{self._name}")self._storage_base_path=Path.cwd()self._pull_interval=pull_interval@propertydefdestination(self)->Path:returnself._storage_base_path/self._namedefset_base_path(self,path:Path):self._storage_base_path=path@propertydefpull_interval(self)->Optional[int]:returnself._pull_interval@propertydef_repository_url_with_credentials(self)->str:ifnotself._credentials:returnself._urlurl_components=urlparse(self._url)credentials=(self._credentials.dict()ifisinstance(self._credentials,Block)elsedeepcopy(self._credentials))fork,vincredentials.items():ifisinstance(v,Secret):credentials[k]=v.get()elifisinstance(v,SecretStr):credentials[k]=v.get_secret_value()formatted_credentials=_format_token_from_credentials(urlparse(self._url).netloc,credentials)ifurl_components.scheme=="https"andformatted_credentialsisnotNone:updated_components=url_components._replace(netloc=f"{formatted_credentials}@{url_components.netloc}")repository_url=urlunparse(updated_components)else:repository_url=self._urlreturnrepository_urlasyncdefpull_code(self):""" Pulls the contents of the configured repository to the local filesystem. """self._logger.debug("Pulling contents from repository '%s' to '%s'...",self._name,self.destination,)git_dir=self.destination/".git"ifgit_dir.exists():# Check if the existing repository matches the configured repositoryresult=awaitrun_process(["git","config","--get","remote.origin.url"],cwd=str(self.destination),)existing_repo_url=Noneifresult.stdoutisnotNone:existing_repo_url=_strip_auth_from_url(result.stdout.decode().strip())ifexisting_repo_url!=self._url:raiseValueError(f"The existing repository at {str(self.destination)} "f"does not match the configured repository {self._url}")self._logger.debug("Pulling latest changes from origin/%s",self._branch)# Update the existing repositorycmd=["git","pull","origin"]ifself._branch:cmd+=[self._branch]ifself._include_submodules:cmd+=["--recurse-submodules"]cmd+=["--depth","1"]try:awaitrun_process(cmd,cwd=self.destination)self._logger.debug("Successfully pulled latest changes")exceptsubprocess.CalledProcessErrorasexc:self._logger.error(f"Failed to pull latest changes with exit code {exc}")shutil.rmtree(self.destination)awaitself._clone_repo()else:awaitself._clone_repo()asyncdef_clone_repo(self):""" Clones the repository into the local destination. """self._logger.debug("Cloning repository %s",self._url)repository_url=self._repository_url_with_credentialscmd=["git","clone",repository_url,]ifself._branch:cmd+=["--branch",self._branch]ifself._include_submodules:cmd+=["--recurse-submodules"]# Limit git history and set path to clone tocmd+=["--depth","1",str(self.destination)]try:awaitrun_process(cmd)exceptsubprocess.CalledProcessErrorasexc:# Hide the command used to avoid leaking the access tokenexc_chain=Noneifself._credentialselseexcraiseRuntimeError(f"Failed to clone repository {self._url!r} with exit code"f" {exc.returncode}.")fromexc_chaindef__eq__(self,__value)->bool:ifisinstance(__value,GitRepository):return(self._url==__value._urlandself._branch==__value._branchandself._name==__value._name)returnFalsedef__repr__(self)->str:return(f"GitRepository(name={self._name!r} repository={self._url!r},"f" branch={self._branch!r})")defto_pull_step(self)->Dict:pull_step={"prefect.deployments.steps.git_clone":{"repository":self._url,"branch":self._branch,}}ifisinstance(self._credentials,Block):pull_step["prefect.deployments.steps.git_clone"]["credentials"]=f"{{{{{self._credentials.get_block_placeholder()}}}}}"elifisinstance(self._credentials,dict):ifisinstance(self._credentials.get("access_token"),Secret):pull_step["prefect.deployments.steps.git_clone"]["credentials"]={**self._credentials,"access_token":("{{"f" {self._credentials['access_token'].get_block_placeholder()}}}}}"),}elifself._credentials.get("access_token")isnotNone:raiseValueError("Please save your access token as a Secret block before converting"" this storage object to a pull step.")returnpull_step
asyncdefpull_code(self):""" Pulls the contents of the configured repository to the local filesystem. """self._logger.debug("Pulling contents from repository '%s' to '%s'...",self._name,self.destination,)git_dir=self.destination/".git"ifgit_dir.exists():# Check if the existing repository matches the configured repositoryresult=awaitrun_process(["git","config","--get","remote.origin.url"],cwd=str(self.destination),)existing_repo_url=Noneifresult.stdoutisnotNone:existing_repo_url=_strip_auth_from_url(result.stdout.decode().strip())ifexisting_repo_url!=self._url:raiseValueError(f"The existing repository at {str(self.destination)} "f"does not match the configured repository {self._url}")self._logger.debug("Pulling latest changes from origin/%s",self._branch)# Update the existing repositorycmd=["git","pull","origin"]ifself._branch:cmd+=[self._branch]ifself._include_submodules:cmd+=["--recurse-submodules"]cmd+=["--depth","1"]try:awaitrun_process(cmd,cwd=self.destination)self._logger.debug("Successfully pulled latest changes")exceptsubprocess.CalledProcessErrorasexc:self._logger.error(f"Failed to pull latest changes with exit code {exc}")shutil.rmtree(self.destination)awaitself._clone_repo()else:awaitself._clone_repo()
Pulls the contents of a remote storage location to the local filesystem.
Parameters:
Name
Type
Description
Default
url
str
The URL of the remote storage location to pull from. Supports
fsspec URLs. Some protocols may require an additional fsspec
dependency to be installed. Refer to the
fsspec docs
for more details.
required
pull_interval
Optional[int]
The interval in seconds at which to pull contents from
remote storage to local storage. If None, remote storage will perform
a one-time sync.
60
**settings
Any
Any additional settings to pass the fsspec filesystem class.
{}
Examples:
Pull the contents of a remote storage location to the local filesystem:
Pull the contents of a remote storage location to the local filesystem
with additional settings:
fromprefect.runner.storageimportRemoteStoragefromprefect.blocks.systemimportSecretstorage=RemoteStorage(url="s3://my-bucket/my-folder",# Use Secret blocks to keep credentials out of your codekey=Secret.load("my-aws-access-key"),secret=Secret.load("my-aws-secret-key"),)awaitstorage.pull_code()
classRemoteStorage:""" Pulls the contents of a remote storage location to the local filesystem. Parameters: url: The URL of the remote storage location to pull from. Supports `fsspec` URLs. Some protocols may require an additional `fsspec` dependency to be installed. Refer to the [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations) for more details. pull_interval: The interval in seconds at which to pull contents from remote storage to local storage. If None, remote storage will perform a one-time sync. **settings: Any additional settings to pass the `fsspec` filesystem class. Examples: Pull the contents of a remote storage location to the local filesystem: ```python from prefect.runner.storage import RemoteStorage storage = RemoteStorage(url="s3://my-bucket/my-folder") await storage.pull_code() ``` Pull the contents of a remote storage location to the local filesystem with additional settings: ```python from prefect.runner.storage import RemoteStorage from prefect.blocks.system import Secret storage = RemoteStorage( url="s3://my-bucket/my-folder", # Use Secret blocks to keep credentials out of your code key=Secret.load("my-aws-access-key"), secret=Secret.load("my-aws-secret-key"), ) await storage.pull_code() ``` """def__init__(self,url:str,pull_interval:Optional[int]=60,**settings:Any,):self._url=urlself._settings=settingsself._logger=get_logger("runner.storage.remote-storage")self._storage_base_path=Path.cwd()self._pull_interval=pull_interval@staticmethoddef_get_required_package_for_scheme(scheme:str)->Optional[str]:# attempt to discover the package name for the given scheme# from fsspec's registryknown_implementation=fsspec.registry.get(scheme)ifknown_implementation:returnknown_implementation.__module__.split(".")[0]# if we don't know the implementation, try to guess it for some# common schemeselifscheme=="s3":return"s3fs"elifscheme=="gs"orscheme=="gcs":return"gcsfs"elifscheme=="abfs"orscheme=="az":return"adlfs"else:returnNone@propertydef_filesystem(self)->fsspec.AbstractFileSystem:scheme,_,_,_,_=urlsplit(self._url)defreplace_blocks_with_values(obj:Any)->Any:ifisinstance(obj,Block):ifhasattr(obj,"get"):returnobj.get()ifhasattr(obj,"value"):returnobj.valueelse:returnobj.dict()returnobjsettings_with_block_values=visit_collection(self._settings,replace_blocks_with_values,return_data=True)returnfsspec.filesystem(scheme,**settings_with_block_values)defset_base_path(self,path:Path):self._storage_base_path=path@propertydefpull_interval(self)->Optional[int]:""" The interval at which contents from remote storage should be pulled to local storage. If None, remote storage will perform a one-time sync. """returnself._pull_interval@propertydefdestination(self)->Path:""" The local file path to pull contents from remote storage to. """returnself._storage_base_path/self._remote_path@propertydef_remote_path(self)->Path:""" The remote file path to pull contents from remote storage to. """_,netloc,urlpath,_,_=urlsplit(self._url)returnPath(netloc)/Path(urlpath.lstrip("/"))asyncdefpull_code(self):""" Pulls contents from remote storage to the local filesystem. """self._logger.debug("Pulling contents from remote storage '%s' to '%s'...",self._url,self.destination,)ifnotself.destination.exists():self.destination.mkdir(parents=True,exist_ok=True)remote_path=str(self._remote_path)+"/"try:awaitfrom_async.wait_for_call_in_new_thread(create_call(self._filesystem.get,remote_path,str(self.destination),recursive=True,))exceptExceptionasexc:raiseRuntimeError(f"Failed to pull contents from remote storage {self._url!r} to"f" {self.destination!r}")fromexcdefto_pull_step(self)->dict:""" Returns a dictionary representation of the storage object that can be used as a deployment pull step. """defreplace_block_with_placeholder(obj:Any)->Any:ifisinstance(obj,Block):returnf"{{{{{obj.get_block_placeholder()}}}}}"returnobjsettings_with_placeholders=visit_collection(self._settings,replace_block_with_placeholder,return_data=True)required_package=self._get_required_package_for_scheme(urlparse(self._url).scheme)step={"prefect.deployments.steps.pull_from_remote_storage":{"url":self._url,**settings_with_placeholders,}}ifrequired_package:step["prefect.deployments.steps.pull_from_remote_storage"]["requires"]=required_packagereturnstepdef__eq__(self,__value)->bool:""" Equality check for runner storage objects. """ifisinstance(__value,RemoteStorage):returnself._url==__value._urlandself._settings==__value._settingsreturnFalsedef__repr__(self)->str:returnf"RemoteStorage(url={self._url!r})"
asyncdefpull_code(self):""" Pulls contents from remote storage to the local filesystem. """self._logger.debug("Pulling contents from remote storage '%s' to '%s'...",self._url,self.destination,)ifnotself.destination.exists():self.destination.mkdir(parents=True,exist_ok=True)remote_path=str(self._remote_path)+"/"try:awaitfrom_async.wait_for_call_in_new_thread(create_call(self._filesystem.get,remote_path,str(self.destination),recursive=True,))exceptExceptionasexc:raiseRuntimeError(f"Failed to pull contents from remote storage {self._url!r} to"f" {self.destination!r}")fromexc
defto_pull_step(self)->dict:""" Returns a dictionary representation of the storage object that can be used as a deployment pull step. """defreplace_block_with_placeholder(obj:Any)->Any:ifisinstance(obj,Block):returnf"{{{{{obj.get_block_placeholder()}}}}}"returnobjsettings_with_placeholders=visit_collection(self._settings,replace_block_with_placeholder,return_data=True)required_package=self._get_required_package_for_scheme(urlparse(self._url).scheme)step={"prefect.deployments.steps.pull_from_remote_storage":{"url":self._url,**settings_with_placeholders,}}ifrequired_package:step["prefect.deployments.steps.pull_from_remote_storage"]["requires"]=required_packagereturnstep
@runtime_checkableclassRunnerStorage(Protocol):""" A storage interface for a runner to use to retrieve remotely stored flow code. """defset_base_path(self,path:Path):""" Sets the base path to use when pulling contents from remote storage to local storage. """...@propertydefpull_interval(self)->Optional[int]:""" The interval at which contents from remote storage should be pulled to local storage. If None, remote storage will perform a one-time sync. """...@propertydefdestination(self)->Path:""" The local file path to pull contents from remote storage to. """...asyncdefpull_code(self):""" Pulls contents from remote storage to the local filesystem. """...defto_pull_step(self)->dict:""" Returns a dictionary representation of the storage object that can be used as a deployment pull step. """...def__eq__(self,__value)->bool:""" Equality check for runner storage objects. """...
defcreate_storage_from_url(url:str,pull_interval:Optional[int]=60)->RunnerStorage:""" Creates a storage object from a URL. Args: url: The URL to create a storage object from. Supports git and `fsspec` URLs. pull_interval: The interval at which to pull contents from remote storage to local storage Returns: RunnerStorage: A runner storage compatible object """parsed_url=urlparse(url)ifparsed_url.scheme=="git"orparsed_url.path.endswith(".git"):returnGitRepository(url=url,pull_interval=pull_interval)else:returnRemoteStorage(url=url,pull_interval=pull_interval)