o y`s@sddlZddlmZddlmZmZddlmZddlm Z ddlm Z ddlm Z ddlm Z dd l mZdd l mZdd l mZmZGd d d eZGdddeZGdddeZGdddeZGdddeZGdddeZGddde ZGddde ZGddde ZdS)N)six)seekablereadable)IN_MEMORY_UPLOAD_TAG)Task)SubmissionTask)CreateMultipartUploadTask)CompleteMultipartUploadTask) get_callbacks)get_filtered_dict)DeferredOpenFileChunksizeAdjusterc@s.eZdZd ddZddZddZdd Zd S) AggregatedProgressCallbackcCs||_||_d|_dS)aAggregates progress updates for every provided progress callback :type callbacks: A list of functions that accepts bytes_transferred as a single argument :param callbacks: The callbacks to invoke when threshold is reached :type threshold: int :param threshold: The progress threshold in which to take the aggregated progress and invoke the progress callback with that aggregated progress total rN) _callbacks _threshold _bytes_seen)self callbacks thresholdr3/usr/lib/python3/dist-packages/s3transfer/upload.py__init__s  z#AggregatedProgressCallback.__init__cCs*|j|7_|j|jkr|dSdSN)rr_trigger_callbacks)rbytes_transferredrrr__call__-s  z#AggregatedProgressCallback.__call__cCs|jdkr |dSdS)z@Flushes out any progress that has not been sent to its callbacksrN)rrrrrrflush2s  z AggregatedProgressCallback.flushcCs"|jD]}||jdqd|_dS)N)rr)rr)rcallbackrrrr7s  z-AggregatedProgressCallback._trigger_callbacksN)r)__name__ __module__ __qualname__rrrrrrrrrs   rc@sLeZdZdZddZdddZddd Zd d Zd d ZddZ ddZ dS)InterruptReaderaWrapper that can interrupt reading using an error It uses a transfer coordinator to propagate an error if it notices that a read is being made while the file is being read from. :type fileobj: file-like obj :param fileobj: The file-like object to read from :type transfer_coordinator: s3transfer.futures.TransferCoordinator :param transfer_coordinator: The transfer coordinator to use if the reader needs to be interrupted. cCs||_||_dSr)_fileobj_transfer_coordinator)rfileobjtransfer_coordinatorrrrrJs zInterruptReader.__init__NcCs|jjr|jj|j|Sr)r% exceptionr$read)ramountrrrr)Ns zInterruptReader.readrcCs|j||dSr)r$seek)rwherewhencerrrr+XszInterruptReader.seekcCs |jSr)r$tellrrrrr.[s zInterruptReader.tellcCs|jdSr)r$closerrrrr/^zInterruptReader.closecCs|Srrrrrr __enter__azInterruptReader.__enter__cOs |dSr)r/)rargskwargsrrr__exit__d zInterruptReader.__exit__r)r) r r!r"__doc__rr)r+r.r/r1r5rrrrr#=s   r#c@sfeZdZdZdddZeddZddZd d Zd d Z d dZ ddZ ddZ ddZ ddZdS)UploadInputManageraJBase manager class for handling various types of files for uploads This class is typically used for the UploadSubmissionTask class to help determine the following: * How to determine the size of the file * How to determine if a multipart upload is required * How to retrieve the body for a PutObject * How to retrieve the bodies for a set of UploadParts The answers/implementations differ for the various types of file inputs that may be accepted. All implementations must subclass and override public methods from this class. NcCs||_||_||_dSr)_osutilr%_bandwidth_limiterrosutilr'bandwidth_limiterrrrrws zUploadInputManager.__init__cCtd)aDetermines if the source for the upload is compatible with manager :param upload_source: The source for which the upload will pull data from. :returns: True if the manager can handle the type of source specified otherwise returns False. zmust implement _is_compatible()NotImplementedErrorcls upload_sourcerrr is_compatible|s z UploadInputManager.is_compatiblecCr>)aWhether the body it provides are stored in-memory :type operation_name: str :param operation_name: The name of the client operation that the body is being used for. Valid operation_names are ``put_object`` and ``upload_part``. :rtype: boolean :returns: True if the body returned by the manager will be stored in memory. False if the manager will not directly store the body in memory. z%must implement store_body_in_memory())NotImplementedroperation_namerrrstores_body_in_memory z(UploadInputManager.stores_body_in_memorycCr>)zProvides the transfer size of an upload :type transfer_future: s3transfer.futures.TransferFuture :param transfer_future: The future associated with upload request z&must implement provide_transfer_size()r?rtransfer_futurerrrprovide_transfer_sizesz(UploadInputManager.provide_transfer_sizecCr>)aDetermines where a multipart upload is required :type transfer_future: s3transfer.futures.TransferFuture :param transfer_future: The future associated with upload request :type config: s3transfer.manager.TransferConfig :param config: The config associated to the transfer manager :rtype: boolean :returns: True, if the upload should be multipart based on configuartion and size. False, otherwise. z*must implement requires_multipart_upload()r?rrKconfigrrrrequires_multipart_uploadrIz,UploadInputManager.requires_multipart_uploadcCr>)aReturns the body to use for PutObject :type transfer_future: s3transfer.futures.TransferFuture :param transfer_future: The future associated with upload request :type config: s3transfer.manager.TransferConfig :param config: The config associated to the transfer manager :rtype: s3transfer.utils.ReadFileChunk :returns: A ReadFileChunk including all progress callbacks associated with the transfer future. z$must implement get_put_object_body()r?rJrrrget_put_object_bodyrIz&UploadInputManager.get_put_object_bodycCr>)aYields the part number and body to use for each UploadPart :type transfer_future: s3transfer.futures.TransferFuture :param transfer_future: The future associated with upload request :type chunksize: int :param chunksize: The chunksize to use for this upload. :rtype: int, s3transfer.utils.ReadFileChunk :returns: Yields the part number and the ReadFileChunk including all progress callbacks associated with the transfer future for that specific yielded part. z)must implement yield_upload_part_bodies()r?)rrK chunksizerrryield_upload_part_bodiessz+UploadInputManager.yield_upload_part_bodiescCs*t||j}|jr|jj||jdd}|S)NF)enabled)r#r%r:get_bandwith_limited_stream)rr&rrr _wrap_fileobjs z UploadInputManager._wrap_fileobjcCst|d}|r t|gSgS)Nprogress)r r)rrKrrrr_get_progress_callbackss  z*UploadInputManager._get_progress_callbackscCsdd|DS)NcSsg|]}|jqSr)r).0rrrr sz;UploadInputManager._get_close_callbacks..r)raggregated_progress_callbacksrrr_get_close_callbacksr0z'UploadInputManager._get_close_callbacksr)r r!r"r7r classmethodrDrHrLrOrPrRrUrWr[rrrrr8hs    r8c@sdeZdZdZeddZddZddZdd Zd d Z d d Z ddZ ddZ ddZ ddZdS)UploadFilenameInputManagerzUpload utility for filenamescCs t|tjSr) isinstancer string_typesrArrrrDs z(UploadFilenameInputManager.is_compatiblecCdS)NFrrFrrrrHr2z0UploadFilenameInputManager.stores_body_in_memorycCs|j|j|jjjdSr)metarLr9 get_file_size call_argsr&rJrrrrLs z0UploadFilenameInputManager.provide_transfer_sizecCs|jj|jkSr)rasizemultipart_thresholdrMrrrrOr0z4UploadFilenameInputManager.requires_multipart_uploadcCsJ||\}}||}||}||}|jj}|jj|||||dS)Nr& chunk_sizefull_file_sizerclose_callbacks)&_get_put_object_fileobj_with_full_sizerUrWr[rardr9#open_file_chunk_reader_from_fileobj)rrKr& full_sizerrirdrrrrPs   z.UploadFilenameInputManager.get_put_object_bodyc cs|jj}|||}td|dD]5}||}||}||d}|j|jjj|||d\} } | | } |j j | || ||d} || fVqdS)N) start_byte part_sizerhrf) rard_get_num_partsrangerWr['_get_upload_part_fileobj_with_full_sizercr&rUr9rk) rrKrQrh num_parts part_numberrrirnr&rlread_file_chunkrrrrRs&       z3UploadFilenameInputManager.yield_upload_part_bodiescCst|||jjd}|S)N) open_function)r r9open)rr&rnrrr_get_deferred_open_files z2UploadFilenameInputManager._get_deferred_open_filecCs"|jjj}|jj}||d|fS)Nr)rarcr&rdrxrrKr&rdrrrrj#s zAUploadFilenameInputManager._get_put_object_fileobj_with_full_sizecKs |d}|d}||||fS)Nrnrh)rx)rr&r4rnrlrrrrr(szBUploadFilenameInputManager._get_upload_part_fileobj_with_full_sizecCstt|jjt|Sr)intmathceilrardfloat)rrKrorrrrp-sz)UploadFilenameInputManager._get_num_partsN)r r!r"r7r\rDrHrLrOrPrRrxrjrrrprrrrr]s  r]c@s<eZdZdZeddZddZddZdd Zd d Z d S) UploadSeekableInputManagerz&Upload utility for an open file objectcCst|ot|Sr)rrrArrrrD4sz(UploadSeekableInputManager.is_compatiblecCs|dkrdSdS)N put_objectFTrrFrrrrH8sz0UploadSeekableInputManager.stores_body_in_memorycCsD|jjj}|}|dd|}|||j||dS)Nr)rarcr&r.r+rL)rrKr&start_position end_positionrrrrL>s   z0UploadSeekableInputManager.provide_transfer_sizecKs ||d}t|t|fS)Nro)r)rBytesIOlen)rr&r4datarrrrrJszBUploadSeekableInputManager._get_upload_part_fileobj_with_full_sizecCs"|jjj}||jj}||fSr)rarcr&r.rdryrrrrjYs zAUploadSeekableInputManager._get_put_object_fileobj_with_full_sizeN) r r!r"r7r\rDrHrLrrrjrrrrr~2s  r~csheZdZdZdfdd ZeddZddZd d Zd d Z d dZ ddZ dddZ ddZ ZS)UploadNonSeekableInputManagerz7Upload utility for a file-like object that cannot seek.Ncstt||||d|_dS)N)superrr _initial_datar; __class__rrrcs  z&UploadNonSeekableInputManager.__init__cCst|Sr)rrArrrrDhsz+UploadNonSeekableInputManager.is_compatiblecCr`)NTrrFrrrrHlr2z3UploadNonSeekableInputManager.stores_body_in_memorycCsdSrrrJrrrrLosz3UploadNonSeekableInputManager.provide_transfer_sizecCsP|jjdur |jj|jkS|jjj}|j}|||d|_t|j|kr&dSdS)NFT)rardrercr&_readrr)rrKrNr&rrrrrOts  z7UploadNonSeekableInputManager.requires_multipart_uploadcCs@||}||}|jjj}||j|||}d|_|Sr)rWr[rarcr& _wrap_datarr))rrKrrir&bodyrrrrPs   z1UploadNonSeekableInputManager.get_put_object_bodyc cs`|jjj}d} ||}||}|d7}|||}|s!dS||||}d}||fVq )NrTrm)rarcr&rWr[rr) rrKrQ file_objectrtrri part_content part_objectrrrrRs      z6UploadNonSeekableInputManager.yield_upload_part_bodiesTcCsxt|jdkr ||S|t|jkr&|jd|}|r$|j|d|_|S|t|j}|j||}|r:d|_|S)a= Reads a specific amount of data from a stream and returns it. If there is any data in initial_data, that will be popped out first. :type fileobj: A file-like object that implements read :param fileobj: The stream to read from. :type amount: int :param amount: The number of bytes to read from the stream. :type truncate: bool :param truncate: Whether or not to truncate initial_data after reading from it. :return: Generator which generates part bodies from the initial data. rNr)rrr))rr&r*truncateramount_to_readrrrrs z#UploadNonSeekableInputManager._readcCs.|t|}|jj|t|t|||dS)a Wraps data with the interrupt reader and the file chunk reader. :type data: bytes :param data: The data to wrap. :type callbacks: list :param callbacks: The callbacks associated with the transfer future. :type close_callbacks: list :param close_callbacks: The callbacks to be called when closing the wrapper for the data. :return: Fully wrapped data. rf)rUrrr9rkr)rrrrir&rrrrs z(UploadNonSeekableInputManager._wrap_datar)T)r r!r"r7rr\rDrHrLrOrPrRrr __classcell__rrrrras  *rc@s\eZdZdZgdZddgZddZ ddd Zd d Zd d Z ddZ ddZ ddZ dS)UploadSubmissionTaskz.Task for submitting tasks to execute an upload)SSECustomerKeySSECustomerAlgorithmSSECustomerKeyMD5 RequestPayerExpectedBucketOwnerrrcCsDtttg}|jjj}|D] }||r|Sq td|t|f)aoRetrieves a class for managing input for an upload based on file type :type transfer_future: s3transfer.futures.TransferFuture :param transfer_future: The transfer future for the request :rtype: class of UploadInputManager :returns: The appropriate class to use for managing a specific type of input for uploads. z&Input %s of type: %s is not supported.) r]r~rrarcr&rD RuntimeErrortype)rrKupload_manager_resolver_chainr&upload_manager_clsrrr_get_upload_input_manager_clss   z2UploadSubmissionTask._get_upload_input_manager_clsNcCsf||||j|}|jjdur|||||s'|||||||dS|||||||dS)a :param client: The client associated with the transfer manager :type config: s3transfer.manager.TransferConfig :param config: The transfer config associated with the transfer manager :type osutil: s3transfer.utils.OSUtil :param osutil: The os utility associated to the transfer manager :type request_executor: s3transfer.futures.BoundedExecutor :param request_executor: The request executor associated with the transfer manager :type transfer_future: s3transfer.futures.TransferFuture :param transfer_future: The transfer future associated with the transfer request that tasks are being submitted for N)rr%rardrLrO_submit_upload_request_submit_multipart_request)rclientrNr<request_executorrKr=upload_input_managerrrr_submit s$    zUploadSubmissionTask._submitc CsN|jj}||d}|jj|t|j||||j|j|j ddd|ddS)Nr)rr&bucketkey extra_argsT)r' main_kwargsis_finaltag) rarc_get_upload_task_tagr%submit PutObjectTaskrPrrr) rrrNr<rrKrrcput_object_tagrrrr4s( z+UploadSubmissionTask._submit_upload_requestcCs|jj}|j|t|j||j|j|jdd}g} ||j} | |d} |jj } t } | |j | }|||}|D]!\}}| |jj|t|j|||j|j|| dd|id| dq<||j}|j|t|j||j|j|d|| dd d dS) N)rrrr)r'r upload_part)rr&rrrtr upload_id)r'rpending_main_kwargsr)rpartsT)r'rrr)rarcr%rrrrr_extra_upload_part_argsrrdr adjust_chunksizemultipart_chunksizerRappendUploadPartTask_extra_complete_multipart_argsr )rrrNr<rrKrrccreate_multipart_future part_futuresextra_part_argsupload_part_tagrdadjusterrQ part_iteratorrtr&complete_multipart_extra_argsrrrrNsx   z.UploadSubmissionTask._submit_multipart_requestcC t||jSr)r UPLOAD_PART_ARGSrrrrrrs z,UploadSubmissionTask._extra_upload_part_argscCrr)r COMPLETE_MULTIPART_ARGSrrrrrr6z3UploadSubmissionTask._extra_complete_multipart_argscCsd}||r t}|Sr)rHr)rrrGrrrrrs z)UploadSubmissionTask._get_upload_task_tagr) r r!r"r7rrrrrrrrrrrrrrs  'M rc@eZdZdZddZdS)rz Task to do a nonmultipart uploadcCsB|}|jd|||d|WddS1swYdS)aP :param client: The client to use when calling PutObject :param fileobj: The file to upload. :param bucket: The name of the bucket to upload to :param key: The name of the key to upload to :param extra_args: A dictionary of any extra arguments that may be used in the upload. )BucketKeyBodyNr)r)rrr&rrrrrrr_mains "zPutObjectTask._mainNr r!r"r7rrrrrr rc@r)rz+Task to upload a part in a multipart uploadc CsR|}|jd|||||d|} Wdn1swY| d} | |dS)a :param client: The client to use when calling PutObject :param fileobj: The file to upload. :param bucket: The name of the bucket to upload to :param key: The name of the key to upload to :param upload_id: The id of the upload :param part_number: The number representing the part of the multipart upload :param extra_args: A dictionary of any extra arguments that may be used in the upload. :rtype: dict :returns: A dictionary representing a part:: {'Etag': etag_value, 'PartNumber': part_number} This value can be appended to a list to be used to complete the multipart upload. )rrUploadId PartNumberrNETag)rrr)r) rrr&rrrrtrrresponseetagrrrrs zUploadPartTask._mainNrrrrrrrr)r{botocore.compatrs3transfer.compatrrs3transfer.futuresrs3transfer.tasksrrrr s3transfer.utilsr r r r objectrr#r8r]r~rrrrrrrrs,        !+yQ/F