o ckF[YC@sddlZddlZddlZddlZddlZddlZddlmZddlZddl m Z m Z m Z m Z mZddlmZmZmZeZedZGdddeZGdd d eZGd d d ejZGd d d eZGdddeZGdddeZdS)N)Queue)DEFAULT_PART_SIZEminimum_part_size chunk_hashes tree_hash bytes_to_hex)UploadArchiveErrorDownloadArchiveErrorTreeHashDoesNotMatchErrorzboto.glacier.concurrentc@s2eZdZedfddZddZddZdd Zd S) ConcurrentTransferer cCs||_||_g|_dSN) _part_size _num_threads_threads)self part_size num_threadsr9/usr/lib/python3/dist-packages/boto/glacier/concurrent.py__init__+s zConcurrentTransferer.__init__cCsLt|}|j|kr |j}n |}td|j|tt|t|}||fS)NzfThe part size specified (%s) is smaller than the minimum required part size. Using a part size of: %s)rrlogdebugintmathceilfloat)r total_sizemin_part_size_requiredr total_partsrrr_calculate_required_part_size0s z2ConcurrentTransferer._calculate_required_part_sizecCs>td|jD]}d|_q|jD]}|qtddS)NzShutting down threads.FzThreads have exited.)rrrshould_continuejoin)rthreadrrr_shutdown_threads<s    z&ConcurrentTransferer._shutdown_threadscCsDtdt|D] }|||fq t|jD]}|tqdS)NzAdding work items to queue.)rrrangeputr _END_SENTINEL)rr worker_queuerirrr_add_work_items_to_queueDs   z-ConcurrentTransferer._add_work_items_to_queueN)__name__ __module__ __qualname__rrr r$r*rrrrr *s  r cs@eZdZdZedffdd Zd ddZdd Zd d ZZ S) ConcurrentUploaderaConcurrently upload an archive to glacier. This class uses a thread pool to concurrently upload an archive to glacier using the multipart upload API. The threadpool is completely managed by this class and is transparent to the users of this class. r cs"tt|||||_||_dS)au :type api: :class:`boto.glacier.layer1.Layer1` :param api: A layer1 glacier object. :type vault_name: str :param vault_name: The name of the vault. :type part_size: int :param part_size: The size, in bytes, of the chunks to use when uploading the archive parts. The part size must be a megabyte multiplied by a power of two. :type num_threads: int :param num_threads: The number of threads to spawn for the thread pool. The number of threads will control how much parts are being concurrently uploaded. N)superr.r_api _vault_name)rapi vault_namerr __class__rrrVs zConcurrentUploader.__init__Nc Cst|j}||\}}dg|}t}t}|j|j||} | d} ||||| || ||z | |||Wnt yX} zt d|j|j| | d} ~ wwt d|j|j| tt||} t d| dS)a^Concurrently create an archive. The part_size value specified when the class was constructed will be used *unless* it is smaller than the minimum required part size needed for the size of the given file. In that case, the part size used will be the minimum part size required to properly upload the given file. :type file: str :param file: The filename to upload :type description: str :param description: The description of the archive. :rtype: str :return: The archive id of the newly created archive. NUploadIdzHAn error occurred while uploading an archive, aborting multipart upload.zCompleting upload.zUpload finished. ArchiveId)osstatst_sizer rr0initiate_multipart_uploadr1r*_start_upload_threads_wait_for_upload_threadsrrrabort_multipart_uploadcomplete_multipart_uploadrr) rfilename descriptionrrr hash_chunksr( result_queueresponse upload_iderrruploadns>      zConcurrentUploader.uploadcCs\t|D]#}|}t|trtd||td||\}}|||<q|dS)N?An error was found in the result queue, terminating threads: %s0An error occurred while uploading an archive: %s)r%get isinstance Exceptionrrr$r)rrBrCr_result part_number tree_sha256rrrr=s    z+ConcurrentUploader._wait_for_upload_threadscCsRtdt|jD]}t|j|j||||}td| |j |q dSNzStarting threads.g?) rrr%rUploadWorkerThreadr0r1timesleepstartrappend)rrCrEr(r@rMr#rrrr<s   z(ConcurrentUploader._start_upload_threadsr ) r+r,r-__doc__rrrGr=r< __classcell__rrr4rr.Ls  2r.cs4eZdZfddZddZddZddZZS) TransferThreadcs$tt|||_||_d|_dS)NT)r/rYr _worker_queue _result_queuer!)rr(rCr4rrrs zTransferThread.__init__cCsh|jr.z |jjdd}Wn tyYqw|tur |dS||}|j||js|dS)N)timeout) r!rZrJEmptyr'_cleanup_process_chunkr[r&)rworkrNrrrruns    zTransferThread.runcCdSr r)rrarrrr`zTransferThread._process_chunkcCrcr rrrrrr_rdzTransferThread._cleanup)r+r,r-rrbr`r_rXrrr4rrYs   rYcs<eZdZddeffdd ZddZddZdd ZZS) rRc sLtt|||||_||_||_t|d|_||_||_ ||_ | |_ dS)Nrb) r/rRrr0r1 _filenameopen_fileobj _upload_id _num_retries_time_between_retries_retry_exceptions) rr2r3r@rEr(rC num_retriestime_between_retriesretry_exceptionsr4rrrs  zUploadWorkerThread.__init__cCsd}t|jdD]<}z ||}W|S|jyE}z$td|d|j|d|jd|j|j|t |j |}WYd}~q d}~ww|S)Nr\zpException caught uploading part number %s for vault %s, attempt: (%s / %s), filename: %s, exception: %s, msg: %sr) r%rl _upload_chunkrnrerrorr1rhr5rSrTrm)rrarNr)rFrrrr`s     z!UploadWorkerThread._process_chunkc Cs|\}}||}|j||j|}t|}tt|}||t|df}t d|||j |j |j|t|||} | ||fS)Nr\zUploading chunk %s of size %s)rjseekreadhashlibsha256 hexdigestrrlenrrr0 upload_partr1rkr) rrarOr start_bytecontents linear_hashtree_hash_bytes byte_rangerDrrrrrs   z UploadWorkerThread._upload_chunkcCs|jdSr )rjclosererrrr_szUploadWorkerThread._cleanup) r+r,r-rLrr`rrr_rXrrr4rrRsrRcs>eZdZdZedffdd ZddZddZd d ZZ S) ConcurrentDownloaderz Concurrently download an archive from glacier. This class uses a thread pool to concurrently download an archive from glacier. The threadpool is completely managed by this class and is transparent to the users of this class. r cstt|||||_dS)a :param job: A layer2 job object for archive retrieval object. :param part_size: The size, in bytes, of the chunks to use when uploading the archive parts. The part size must be a megabyte multiplied by a power of two. N)r/rr_job)rjobrrr4rrrs zConcurrentDownloader.__init__c Cs|jj}||\}}t}t}|||||||z ||||Wnty;}zt d||d}~wwt ddS)z Concurrently download an archive. :param filename: The filename to download the archive to :type filename: str z2An error occurred while downloading an archive: %sNzDownload completed.) r archive_sizer rr*_start_download_threads_wait_for_download_threadsr rr)rr@rrrr(rCrFrrrdownload$s  zConcurrentDownloader.downloadcCsdg|}t|dD}t|D]7}|}t|tr*td||td||\}} } } | ||<|| } | | | | | qWdn1sQwYt t |} td|jj| |jj| kry|td|jj| f|dS)a Waits until the result_queue is filled with all the downloaded parts This indicates that all part downloads have completed Saves downloaded parts into filename :param filename: :param result_queue: :param total_parts: NwbrHrIz?Verifying final tree hash of archive, expecting: %s, actual: %szBTree hash for entire archive does not match, expected: %s, got: %s)rir%rJrKrLrrr$r rtwriteflushrrrsha256_treehashr )rr@rCrrBfrMrNrOr actual_hashdatar{ final_hashrrrr9sH          z/ConcurrentDownloader._wait_for_download_threadscCsJtdt|jD]}t|j||}td||j |q dSrQ) rrr%rDownloadWorkerThreadrrSrTrUrrV)rrCr(rMr#rrrr`s  z,ConcurrentDownloader._start_download_threads) r+r,r-rWrrrrrrXrrr4rr s  'rcs4eZdZddeffdd ZddZddZZS)rrfcs.tt|||||_||_||_||_dS)a  Individual download thread that will download parts of the file from Glacier. Parts to download stored in work queue. Parts download to a temp dir with each part a separate file :param job: Glacier job object :param work_queue: A queue of tuples which include the part_number and part_size :param result_queue: A priority queue of tuples which include the part_number and the path to the temp file that holds that part's data. N)r/rrrrlrmrn)rrr(rCrorprqr4rrrjs  zDownloadWorkerThread.__init__c Cstd}t|jD]0}z ||}W|S|jy7}ztd|d|jt|j |}WYd}~qd}~ww|S)z Attempt to download a part of the archive from Glacier Store the result in the result_queue :param work: Nz6Exception caught downloading part number %s for job %sr) r%rl_download_chunkrnrrsrrSrTrm)rrarNrMrFrrrr`s   z#DownloadWorkerThread._process_chunkc Cs|\}}||}|||df}td|||j|}|}ttt|}|d|kr:td||d|f||t ||fS)z Downloads a chunk of archive from Glacier. Saves the data to a temp file Returns the part number and temp file location :param work: r\zDownloading chunk %s of size %sTreeHashzBTree hash for part number %s does not match, expected: %s, got: %s) rrr get_outputrurrrr binascii unhexlify) rrarOrr{rrDrrrrrrs   z$DownloadWorkerThread._download_chunk)r+r,r-rLrr`rrXrrr4rrisr)r8r threadingrvrSlogging boto.compatrrboto.glacier.utilsrrrrrboto.glacier.exceptionsrr r objectr' getLoggerrr r.ThreadrYrRrrrrrrs$  "o4]