The upload to cloud stage includes some logging statements, some checks to see if the file has already been uploaded (allows restarts of the process if stopped for some reason) and then chunking the file up to AWS Glacier, in 1MB chunks if over 1MB (via multipart upload) and all at once if less than 1MB in size - part_size is 1MB:
...
Find.find(archive_path) do |local_file|
···
On Apr 15, 2016, at 11:58 AM, Eric Wong <normalperson@yhbt.net> wrote:
Josh Miller <joshua@itsecureadmin.com> wrote:
#
# check to see if we've already uploaded the file (recent archive index)
# - this is a hash that was read in from a CSV file before starting the Find.find...
#
@logger.info("checking old vault index for this file: #{local_file}")
if old_index_reference["#{local_file}"] = 1
@logger.debug("file exists in old vault index: #{local_file}")
upload_done = 1
end
if upload_done < 1
@logger.info("archiving #{local_file} to #{vault_name})")
archive_id = self.create_archive(local_file, vault_name, part_size, treehash_client)
#
# update s3 index which is a CSV file locally written until we’re finished
# - batch mode when operating on a directory
#
if archive_id.eql?("invalid")
@logger.info("invalid response, not writing: #{local_file},#{archive_id}")
else
@logger.debug("index file write: #{local_file},#{archive_id}")
file.write("#{epoch_time},#{local_file},#{archive_id}\n")
end
else
@logger.info("file already archived, skipping")
end
else
# must be a directory or sym-link
@logger.debug("#{local_file} not a file, not archiving")
end
end
..
...and then the function create_archive would be:
…
def create_archive(filename, vault_name, part_size, treehash_client)
archive_id = "invalid"
self.create_or_verify_vault(vault_name)
File.open(filename, 'rb') do |file|
if file.size > part_size
@logger.info("File size over #{part_size} bytes, using multipart upload...")
mpu_create_response = @glacier_client.initiate_multipart_upload({
account_id: @account_id,
vault_name: vault_name,
archive_description: filename,
part_size: part_size,
})
total_parts = file.size.to_f / part_size
current_part = 1
range_start = 0
range_end = 0
file.each_part do |part|
@logger.debug("part_size: #{part_size}")
@logger.debug("part.size: #{part.size}")
@logger.debug("range_start: #{range_start}")
@logger.debug("range_end: #{range_end}")
range_start = part_size * current_part - part_size
if part.size < part_size
range_end = range_start + part.size - 1
else
range_end = range_start + part_size - 1
end
range = "bytes #{range_start.to_s}-#{range_end.to_s}/*"
@logger.debug("part range: #{range}")
#
# add some logic to loop over part uploads and handle timeouts somewhat gracefully
#
restart_upload = 1
while restart_upload > 0
begin
part_response = @glacier_client.upload_multipart_part({
account_id: @account_id,
vault_name: vault_name,
upload_id: mpu_create_response.upload_id,
range: range,
body: part,
})
restart_upload = 0
rescue Aws::Glacier::Errors::RequestTimeoutException => rte
@logger.info("Upload timed out, restarting upload.")
restart_upload = 1
end
end
percent_complete = (current_part.to_f / total_parts.to_f) * 100
percent_complete = 100 if percent_complete > 100
percent_complete = sprintf('%.2f', percent_complete.to_f)
@logger.info("percent complete: #{percent_complete}")
current_part = current_part + 1
end # file.each_part do |part|
checksum = treehash_client.calculate_tree_hash(filename)
@logger.debug(YAML.dump(checksum))
mpu_complete_resp = @glacier_client.complete_multipart_upload({
account_id: @account_id,
vault_name: vault_name,
upload_id: mpu_create_response.upload_id,
archive_size: file.size,
checksum: checksum,
})
archive_id = mpu_complete_resp.archive_id
elsif file.size > 0 # not multipart, single file upload, but at least greater than 0 size
@logger.info("File size under #{part_size} bytes, not using multipart upload: #{filename}")
begin
upload_archive_resp = @glacier_client.upload_archive({
vault_name: vault_name,
account_id: @account_id,
archive_description: filename,
body: file,
})
…exception handling here…omitted for brevity...
archive_id = upload_archive_resp.archive_id
else
@logger.info("File size 0, not archiving: #{filename}")
end # if file.size > part_size
@logger.info("uploaded #{filename} to #{vault_name} with archive_id of #{archive_id}")
end # File.open(filename, 'rb') do |file|
return archive_id
end
I’ve over-ridden the File class with:
class File
def each_part(part_size=PART_SIZE)
yield read(part_size) until eof?
end
end
Let me know if a better format would be more appropriate, and all tips / pointers appreciated.
Large allocations can cause memory fragmentation and really bad
growth. You can also try setting MALLOC_MMAP_THRESHOLD_=131072
(or similar number) for glibc malloc or try building with
jemalloc; but avoiding slurping at all is best.
ref: https://t-t-travails.blogspot.com/2009/05/mr-malloc-gets-schooled.html
I will read through this, thanks.
Reading Find.find, it calls Dir.entries which wastes memory by
slurping the entire directory contents into memory (and doing it
recursively by going depth-first). If you have a flat directory
structure with lots of files at any level, this is going to be
ugly...
Perhaps try using the find(1) command to limit the number of
files which enters the Ruby process at once:
IO.popen(%W(find #{path} -print0)) do |rpipe|
recsep = "\0".freeze # for -print0
rpipe.each_line(recsep) do |line|
line.chomp!(recsep)
# upload to cloud # XXX make sure this doesn't slurp
end
end
find(1) seems to slurp entire directory listings into memory,
too, but I expect its internal data overhead is much less than
that of an array of Ruby strings which Find.find keeps.
Is there a better way to do this? Even generating a list of
filenames would be hundreds of MBs in size, which is something
I’ve considered as well.
General rule is that whenever you handle a large amount of data,
stream and process it in small chunks. I always consider the
data I intend to process, first, and build code around that;
never the other way around.
Unsubscribe: <mailto:ruby-talk-request@ruby-lang.org?subject=unsubscribe>
<http://lists.ruby-lang.org/cgi-bin/mailman/options/ruby-talk>
Thanks a lot,
Josh Miller
ITSA Consulting, LLC
https://itsecureadmin.com/