Skip to content

Commit

Permalink
[Taskserver] Enhance ABFS upload logic to handle chunk position tracking
Browse files Browse the repository at this point in the history
Change-Id: Iab92b2c149e225090ae717eaba176a915db1e7ca
  • Loading branch information
Athithyaa Selvam committed Jun 27, 2024
1 parent 0c4f9d5 commit 9b21151
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions desktop/libs/azure/src/azure/abfs/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from filebrowser.utils import calculate_total_size, generate_chunks

if sys.version_info[0] > 2:
from io import StringIO as string_io
from io import BytesIO, StringIO as string_io
else:
from cStringIO import StringIO as string_io

Expand Down Expand Up @@ -99,18 +99,23 @@ def upload_chunks(self):
raise PopupException("ABFSFineUploaderChunkedUpload: Initiating ABFS upload to target path: %s failed %s" % (self.target_path, e))

try:
current_position = 0 # keeps track of position and uploaded_size
for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=DEFAULT_WRITE_SIZE), 1):
LOG.debug("ABFSFineUploaderChunkedUpload: uploading file %s, part %d, size %d, dest: %s" %
(self.file_name, i, total, self.destination))
self._fs._append(self.target_path, chunk)
chunk_size = len(chunk.getvalue())
LOG.debug("ABFSFineUploaderChunkedUpload: uploading file %s, part %d, size %d, dest: %s, current_position: %d" %
(self.file_name, i, chunk_size, self.destination, current_position))
params = {'position': current_position}
self._fs._append(self.target_path, chunk, size=chunk_size, offset=0, params=params)
current_position += chunk_size

except Exception as e:
self._fs.remove(self.target_path)
LOG.exception('ABFSFineUploaderChunkedUpload: Failed to upload file to ABFS at %s: %s' % (self.target_path, e))
raise PopupException("ABFSFineUploaderChunkedUpload: S3FileUploadHandler uploading file %s part: %d failed" % (self.filepath, i))
finally:
# finish the upload
self._fs.flush(self.target_path, {'position': self.totalfilesize})
LOG.info("ABFSFineUploaderChunkedUpload: has completed file upload to ABFS, total file size is: %d." % self.totalfilesize)
# finish the upload using the tracked upload size
self._fs.flush(self.target_path, {'position': current_position})
LOG.info("ABFSFineUploaderChunkedUpload: has completed file upload to ABFS, total file size is: %d." % current_position)
LOG.debug("%s" % self._fs.stats(self.target_path))

def upload(self):
Expand Down Expand Up @@ -190,7 +195,8 @@ def receive_data_chunk(self, raw_data, start):
if self._is_abfs_upload():
try:
LOG.debug("ABFSFileUploadHandler uploading file part with size: %s" % self._part_size)
self._fs._append(self.target_path, raw_data, params={'position': int(start)})
buffered_data = BytesIO(raw_data)
self._fs._append(self.target_path, buffered_data, params={'position': int(start)})
return None
except Exception as e:
self._fs.remove(self.target_path)
Expand Down

0 comments on commit 9b21151

Please sign in to comment.