Skip to content

Commit

Permalink
Modify consumer to output paths to stored files from run command
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Nov 27, 2023
1 parent 21a1663 commit 9967881
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 89 deletions.
30 changes: 20 additions & 10 deletions src/nwp_consumer/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import structlog
from docopt import docopt
import pathlib

from nwp_consumer import internal
from nwp_consumer.internal import config, inputs, outputs
Expand All @@ -48,7 +49,7 @@
log = structlog.getLogger()


def run(arguments: dict) -> int:
def run(arguments: dict) -> tuple[list[pathlib.Path], list[pathlib.Path]]:
"""Run the CLI."""
# --- Map arguments to service configuration --- #

Expand Down Expand Up @@ -134,38 +135,44 @@ def run(arguments: dict) -> int:

# Logic for the "check" command
if arguments['check']:
return service.Check()
_ = service.Check()
return []

# Logic for the env command
if arguments['env']:
# Missing env vars are printed during mapping of source/sink args
return 0
return []

log.info("nwp-consumer starting", version=__version__, arguments=arguments)
log.info("nwp-consumer service starting", version=__version__, arguments=arguments)

rawFiles: list[pathlib.Path] = []
processedFiles: list[pathlib.Path] = []

if arguments['download']:
service.DownloadRawDataset(
rawFiles += service.DownloadRawDataset(
start=startDate,
end=endDate
)

if arguments['convert']:
service.ConvertRawDatasetToZarr(
processedFiles += service.ConvertRawDatasetToZarr(
start=startDate,
end=endDate
)

if arguments['consume']:
service.Check()
service.DownloadAndConvert(
r, p = service.DownloadAndConvert(
start=startDate,
end=endDate
)
rawFiles += r
processedFiles += p

if arguments['--create-latest']:
service.CreateLatestZarr()
processedFiles += service.CreateLatestZarr()

return 0
return rawFiles, processedFiles


def main() -> None:
Expand All @@ -176,7 +183,7 @@ def main() -> None:

programStartTime = dt.datetime.now()
try:
run(arguments=arguments)
files: tuple[list[pathlib.Path], list[pathlib.Path]] = run(arguments=arguments)
except Exception as e:
log.error("encountered error running nwp-consumer", error=str(e), exc_info=True)
erred = True
Expand All @@ -191,6 +198,9 @@ def main() -> None:
log.info(
"nwp-consumer finished",
elapsed_time=str(elapsedTime),
num_raw_files=len(files[0]),
num_processed_files=len(files[1]),
files_per_minute=(len(files[0]) + len(files[1])) / elapsedTime.total_seconds() * 60,
version=__version__
)
if erred:
Expand Down
2 changes: 1 addition & 1 deletion src/nwp_consumer/internal/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def exists(self, *, dst: pathlib.Path) -> bool:
pass

@abc.abstractmethod
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path:
"""Move the given temp file to the store at path p.
:param src: Path to temp file to move
Expand Down
19 changes: 17 additions & 2 deletions src/nwp_consumer/internal/outputs/huggingface/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,28 @@ def __init__(self, repoID: str, token: str | None = None, endpoint: str | None
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return self.__fs.exists(self.datasetPath / dst.as_posix())

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
self.__fs.put(
lpath=src.as_posix(),
rpath=(self.datasetPath / dst).as_posix(),
recursive=True
)
return self.__fs.du(path=(self.datasetPath / dst).as_posix())
nbytes = self.__fs.du(path=(self.datasetPath / dst).as_posix())
if nbytes != src.stat().st_size:
log.warn(
event="stored file size does not match source file size",
src=src.as_posix(),
dst=dst.as_posix(),
srcsize=src.stat().st_size,
dstsize=nbytes
)
else:
log.debug(
event="stored file",
filepath=dst.as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
allDirs = [
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/huggingface/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def test_store(self):
src = internal.TMP_DIR / f'nwpc-{uuid.uuid4()}'
src.write_bytes(bytes(filename, 'utf-8'))

n = self.client.store(src=src, dst=dst)
self.assertEqual(n, 30)
out = self.client.store(src=src, dst=dst)
self.assertEqual(out, dst)
self.assertTrue(self.mock_fs.put.called_with(src, dst))
self.assertTrue(self.mock_fs.du.called_with(dst))

Expand Down
29 changes: 19 additions & 10 deletions src/nwp_consumer/internal/outputs/localfs/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,34 @@
class Client(internal.StorageInterface):
"""Client for local filesystem."""

def exists(self, *, dst: pathlib.Path) -> bool:
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return dst.exists()

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int: # noqa: D102
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
if src == dst:
return os.stat(src).st_size
return dst

dst.parent.mkdir(parents=True, exist_ok=True)
shutil.move(src=src, dst=dst)
# Do delete temp file here to avoid local duplication of file.
src.unlink(missing_ok=True)
nbytes = os.stat(dst).st_size
log.debug(
event="stored file locally",
src=src.as_posix(),
dst=dst.as_posix(),
nbytes=nbytes
)
return nbytes
if nbytes != dst.stat().st_size:
log.warn(
event="file size mismatch",
src=src.as_posix(),
dst=dst.as_posix(),
srcbytes=src.stat().st_size,
dstbytes=nbytes
)
else:
log.debug(
event="stored file locally",
src=src.as_posix(),
dst=dst.as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
# List all the inittime folders in the given directory
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/localfs/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ def test_store(self):
src.write_bytes(bytes("test_file_contents", 'utf-8'))

# Store the file using the function
size = self.testClient.store(src=src, dst=dst)
out = self.testClient.store(src=src, dst=dst)

# Assert that the file exists
self.assertTrue(dst.exists())
# Assert that the file has the correct size
self.assertEqual(size, 18)
self.assertEqual(out, dst)
# Assert that the temporary file has been deleted
self.assertFalse(src.exists())

Expand Down
35 changes: 22 additions & 13 deletions src/nwp_consumer/internal/outputs/s3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ class Client(internal.StorageInterface):
__fs: s3fs.S3FileSystem

def __init__(self, key: str, secret: str, bucket: str, region: str,
endpointURL: str = None) -> None:
endpointURL: str | None = None) -> None:
"""Create a new S3Client."""
(key, secret) = (None, None) if (key, secret) == ("", "") else (key, secret)
if key is None and secret is None:
(_key, _secret) = (None, None) if (key, secret) == ("", "") else (key, secret)
if _key is None and _secret is None:
log.info(
event="attempting AWS connection using default credentials",
)
Expand All @@ -39,10 +39,10 @@ def __init__(self, key: str, secret: str, bucket: str, region: str,

self.__bucket = pathlib.Path(bucket)

def exists(self, *, dst: pathlib.Path) -> bool:
def exists(self, *, dst: pathlib.Path) -> bool: # noqa: D102
return self.__fs.exists((self.__bucket / dst).as_posix())

def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: # noqa: D102
log.debug(
event="storing file in s3",
src=src.as_posix(),
Expand All @@ -52,15 +52,24 @@ def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> int:
# Don't delete temp file as user may want to do further processing locally.
# All temp files are deleted at the end of the program.
nbytes = self.__fs.du((self.__bucket / dst).as_posix())
log.debug(
event="stored file in s3",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
nbytes=nbytes
)
return nbytes
if nbytes != src.stat().st_size:
log.warn(
event="file size mismatch",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
srcsize=src.stat().st_size,
dstsize=nbytes
)
else:
log.debug(
event="stored file in s3",
src=src.as_posix(),
dst=(self.__bucket / dst).as_posix(),
nbytes=nbytes
)
return dst

def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]:
def listInitTimes(self, *, prefix: pathlib.Path) -> list[dt.datetime]: # noqa: D102
allDirs = [
pathlib.Path(d).relative_to(self.__bucket / prefix)
for d in self.__fs.glob(f'{self.__bucket}/{prefix}/{internal.IT_FOLDER_GLOBSTR}')
Expand Down
4 changes: 2 additions & 2 deletions src/nwp_consumer/internal/outputs/s3/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_store(self):
# Write the data to the temporary file
src.write_bytes(bytes(fileName, 'utf-8'))

n = self.client.store(src=src, dst=dst)
name = self.client.store(src=src, dst=dst)

# Verify the written file in the raw directory
response = self.testS3.get_object(
Expand All @@ -117,7 +117,7 @@ def test_store(self):
self.assertEqual(response["Body"].read(), bytes(fileName, 'utf-8'))

# Verify the correct number of bytes was written
self.assertEqual(n, len(bytes(fileName, 'utf-8')))
self.assertEqual(name, dst)

# Delete the created file and the temp file
self.testS3.delete_object(
Expand Down
Loading

0 comments on commit 9967881

Please sign in to comment.