Restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching#148
Restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching#148mahrsee1997 wants to merge 8 commits intomainfrom
Conversation
…arated the request & download stage of fetching
alxmrs
left a comment
There was a problem hiding this comment.
Overall, the pipeline improvements look good! Here's my early feedback for the current draft.
| def fetch(self, dataset: str, selection: t.Dict) -> None: | ||
| pass | ||
|
|
||
| def download(self, dataset: str, result: t.Dict, output: str) -> None: | ||
| pass |
There was a problem hiding this comment.
These should raise a NotImplementedError.
| self._redirector.__exit__(exc_type, exc_value, traceback) | ||
|
|
||
|
|
||
| class APIRequestExtended(api.APIRequest): |
There was a problem hiding this comment.
How about we include "MARS" in the name of this class?
There was a problem hiding this comment.
Naming discussion. I kind of like "extended", but is there a better name? What about "Adapter"? Any other ideas?
There was a problem hiding this comment.
On second thought, adding "MARS" to the name of the class may not be necessary.
There was a problem hiding this comment.
Named SplitMARSRequest.
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) |
There was a problem hiding this comment.
You can omit the constructor if it just calls super.
| tries = 0 | ||
| while size != result["size"] and tries < 10: | ||
| size = self._transfer( | ||
| urljoin(self.url, result["href"]), target, result["size"] | ||
| ) | ||
| if size != result["size"] and tries < 10: | ||
| tries += 1 | ||
| self.log("Transfer interrupted, resuming in 60s...") | ||
| time.sleep(60) | ||
| else: | ||
| break |
There was a problem hiding this comment.
Ideally, we could update or replace this block with code that is capable of downloading more than 1 MB of data at a time. It'd be nice if we could do something like this, for example:
There was a problem hiding this comment.
Yes, necessary code changes have been done.
| open(target, "w").close() | ||
|
|
||
| size = -1 | ||
| tries = 0 |
There was a problem hiding this comment.
Following up with the comment below: We could use Beam's retry logic with exponential backoff (via the decorator) instead of re-using ECMWF's implementation. WDYT?
There was a problem hiding this comment.
Yes, necessary code changes have been done.
| with self.manifest.transact(config.selection, target, config.user_id): | ||
| with tempfile.NamedTemporaryFile() as temp: | ||
| logger.info(f'[{worker_name}] Fetching data for {target!r}.') | ||
| logger.info(f'[{worker_name}] Fetching and Downloading data for {target!r}.') |
There was a problem hiding this comment.
petty nit: can you use an & instead of "and" here :) ?
| with tempfile.NamedTemporaryFile() as temp: | ||
| logger.info(f'[{worker_name}] Fetching data for {target!r}.') | ||
| result = self.fetch(client, config.dataset, config.selection) | ||
| yield (result, config, worker_name, temp.name, target) |
There was a problem hiding this comment.
Consider creating an internal data class for passing along this result.
Also: Are you sure that using the temp.name is safe? It's possible that the temporary file will disappear.
Furthermore, looking at the code: do you need to create a temporary file here? If you don't need it for the fetch, it's probably safer to move this to the next stage.
There was a problem hiding this comment.
Maybe we don't need the dataclass since we can probably simplify what's returned. For example, We don't need to pass the target since that can be derived from the config. So, I see the tuple consisting of three parts: config, worker_name, result.
There was a problem hiding this comment.
(you can choose your favorite order for these).
| client = CLIENTS[self.client_name](config) | ||
| target = prepare_target_name(config) | ||
|
|
||
| with self.manifest.transact(config.selection, target, config.user_id): |
There was a problem hiding this comment.
A complication that I didn't anticipate until now: It would probably be best if we updated the manifest to distinguish between retrieved and downloaded. WDYT?
There was a problem hiding this comment.
Made necessary changes. Added a new class variable in DownloadStatus named stage which represents the current stage of the request.
| @retry_with_exponential_backoff | ||
| def upload(self, src: str, dest: str) -> None: | ||
| """Upload blob to cloud storage, with retries.""" | ||
| with io.FileIO(src, 'rb') as src_: |
There was a problem hiding this comment.
can this be with open(src, 'rb')?
| 'Default: make an educated guess per client & config. ' | ||
| 'Please see the client documentation for more details.') | ||
| parser.add_argument('-o', '--optimise-download', action='store_true', default=False, | ||
| help="Optimised the downloads.") |
There was a problem hiding this comment.
Make sure you cross apply the description of what this does here.
…r cosmetic changes.
…llelism and updated the manifest.
…e docstring in RetrieveData
I'm taking a leaf from @mahrsee1997's PR #148 so that we can copy data from the MARS server faster (using a larger buffer size). Thanks for the primary contribution here, Rahul. * restructured the fetch stage to be a Composite Beam transform and separated the request & download stage of fetching * retry logic of downloads for MARS client & other cosmetic changes. * Remove fetch / dl split * retrieve in two steps. * rm fetch + dl methods. * Fix: `nim_requests_per_key` does not require class construction. * fix lint: removed unused import. * add support for aria2 for faster download * code changes as per Alex feedback. Co-authored-by: mahrsee1997 <rahul@infocusp.in>
No description provided.