Source code for gfluent.gcs

"""A fluent style Storage client
"""
import logging
import os
from typing import List

from google.cloud.exceptions import NotFound
from google.cloud import storage

logger = logging.getLogger(__name__)

[docs]class GCS(object): __required_setting = { "local": "local full path", "bucket": "the bucket name without gs://", "prefix": "remote prefix" } def __init__(self, project: str, **kwargs): if not isinstance(project, str) or project is None: raise ValueError("project id must be provided to init the BQ") self._project = project self._client = storage.Client(project=project) for attr in kwargs: if attr in GCS.__required_setting.keys(): getattr(self, attr)(kwargs[attr]) # setattr(self, f"_{attr}", kwargs[attr]) else: logger.warning(f"Ignored argument `{attr}`") def __repr__(self): return f"{GCS(project=self._project)}"
[docs] def local(self, path: str, suffix: str = None): """Specify the local path, could be a directory or a file :param path: directory or file :type path: str :param path: the suffix of included files :type path: str, Optional :raises ValueError: if path not found as a file or directory """ self._local = path if os.path.isfile(path): self._local_files = [path] elif os.path.isdir(path): if suffix: self._local_files = [os.path.join(path, x) for x in os.listdir(path) if x.endswith(suffix) and os.path.isfile(os.path.join(path, x))] else: self._local_files = [os.path.join(path, x) for x in os.listdir(path) if os.path.isfile(os.path.join(path, x))] else: raise ValueError(f"{path} is not a dir nor a file") return self
[docs] def bucket(self, bucket: str): """Specify the bucket name without gs:// :param bucket: bucket name without gs:// :type bucket: str """ if bucket.startswith("gs://"): self._bucket = bucket[5:] else: self._bucket = bucket return self
[docs] def prefix(self, prefix:str): """Specify the blob prefix :param prefix: without the ending / :type prefix: str """ self._prefix = prefix return self
[docs] def upload(self): """Upload file(s) to GCS with given prefix """ bucket = self._client.bucket(self._bucket) for f in self._local_files: basename = os.path.basename(f) blob = bucket.blob(f"{self._prefix}/{basename}") logger.info(f"uploading {f} to gs://{self._bucket}/{blob.name}") blob.upload_from_filename(f)
[docs] def download(self): """Download file from the given prefix to local folder The prefix of the blob object will be ignored, ``gs://bucket/folder1/abc.txt`` will be downloaded to ``/var/temp/abc.txt`` if the ``.local('var/temp')`` is set. """ if not os.path.isdir(self._local): raise ValueError(f"{self._local} must be a dir for download") bucket = self._client.bucket(self._bucket) blobs = bucket.list_blobs(prefix=self._prefix, delimiter='/') for blob in blobs: destination_uri = os.path.join(self._local, os.path.basename(blob.name)) logger.info(f"downloading {blob} to {destination_uri}") blob.download_to_filename(destination_uri)
def delete(self): if "_prefix" not in self.__dict__: raise ValueError("_prefix must be specified for delete()") bucket = self._client.bucket(self._bucket) blobs_to_delete = [blob for blob in bucket.list_blobs(prefix=self._prefix)] with self._client.batch(): for blob in blobs_to_delete: logger.warning(f"deleting gs://{self._bucket}/{blob.name}") blob.delete()