o [d@sdddgZddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z ddl m Z m Z ddlmZdZd Zd Zd ZeZd d ZddZGdddeZGdddZddZGdddeZ  d*ddZddZGdddeZ Gd dde!Z"Gd!d"d"e!Z#e#Z$Gd#d$d$e#Z%Gd%d&d&e!Z&Gd'd(d(e&Z'Gd)dde"Z(dS)+Pool ThreadPoolN)util) get_context TimeoutError)waitINITRUNCLOSE TERMINATEcCs tt|SN)listmapargsr+/usr/lib/python3.10/multiprocessing/pool.pymapstar/ rcCstt|d|dS)Nrr)r itertoolsstarmaprrrr starmapstar2src@eZdZddZddZdS)RemoteTracebackcCs ||_dSr tb)selfrrrr__init__: zRemoteTraceback.__init__cCs|jSr rrrrr__str__<szRemoteTraceback.__str__N)__name__ __module__ __qualname__rr!rrrrr9s rc@r)ExceptionWithTracebackcCs0tt|||}d|}||_d||_dS)Nz """ %s""") tracebackformat_exceptiontypejoinexcr)rr+rrrrr@s zExceptionWithTraceback.__init__cCst|j|jffSr ) rebuild_excr+rr rrr __reduce__Ez!ExceptionWithTraceback.__reduce__N)r"r#r$rr-rrrrr%?s r%cCst||_|Sr )r __cause__)r+rrrrr,Hs r,cs0eZdZdZfddZddZddZZS)MaybeEncodingErrorzVWraps possible unpickleable errors, so they can be safely sent through the socket.cs.t||_t||_tt||j|jdSr )reprr+valuesuperr0r)rr+r2 __class__rrrTs  zMaybeEncodingError.__init__cCsd|j|jfS)Nz(Error sending result: '%s'. Reason: '%s')r2r+r rrrr!YszMaybeEncodingError.__str__cCsd|jj|fS)Nz<%s: %s>)r5r"r rrr__repr__]r.zMaybeEncodingError.__repr__)r"r#r$__doc__rr!r6 __classcell__rrr4rr0Ps  r0rFc Cs|durt|tr |dkstd||j}|j}t|dr)|j|j |dur1||d}|dus=|r||krz|} Wnt t fyRt dYnw| dur]t dn| \} } } } }z d| | i|f}Wn"ty}z|r| turt||j}d|f}WYd}~nd}~wwz || | |fWn)ty}zt||d}t d ||| | d|ffWYd}~nd}~wwd} } }} } }|d7}|dus=|r||ks=t d |dS) NrzMaxtasks {!r} is not valid_writerrz)worker got EOFError or OSError -- exitingzworker got sentinel -- exitingTFz0Possible encoding error while sending result: %szworker exiting after %d tasks) isinstanceintAssertionErrorformatputgethasattrr9close_readerEOFErrorOSErrorrdebug Exception_helper_reraises_exceptionr% __traceback__r0)inqueueoutqueue initializerinitargsmaxtaskswrap_exceptionr>r? completedtaskjobifuncrkwdsresultewrappedrrrworkerasX        rXcCs|)z@Pickle-able helper function for use by _guarded_task_generation.r)exrrrrGrGcs2eZdZdZddfdd ZfddZZS) _PoolCachez Class that implements a cache for the Pool class that will notify the pool management threads every time the cache is emptied. The notification is done by the use of a queue that is provided when instantiating the cache. Nnotifiercs||_tj|i|dSr )r]r3r)rr]rrTr4rrrsz_PoolCache.__init__cs$t||s|jddSdSr )r3 __delitem__r]r>)ritemr4rrr^s z_PoolCache.__delitem__)r"r#r$r7rr^r8rrr4rr[sr[c@seZdZdZdZeddZ  dLddZej e fd d Z d d Z d dZ eddZeddZddZeddZeddZddZddZdifddZdMdd ZdMd!d"Z  dNd#d$Zd%d&ZdOd(d)ZdOd*d+Zdiddfd,d-Z  dNd.d/Z  dNd0d1ZedMd2d3Ze d4d5Z!ed6d7Z"ed8d9Z#ed:d;Z$dd?Z&d@dAZ'dBdCZ(edDdEZ)e dFdGZ*dHdIZ+dJdKZ,dS)PrzS Class which supports an async version of applying functions to arguments. TcOs|j|i|Sr Process)ctxrrTrrrrasz Pool.ProcessNrcCs0g|_t|_|p t|_|t|_|j|_ t |j d|_ ||_ ||_ ||_|dur5tp4d}|dkr=td|durNt|trJ|dkrNtd|durZt|sZtd||_z|Wn!ty|jD] }|jdurx|qm|jD]}|q|w|}tjtj |j |j|j|j!|j|j|j"|j#|j |j|j |j$||j fd|_%d|j%_&t'|j%_|j%(tjtj)|j|j*|j#|j|j fd|_+d|j+_&t'|j+_|j+(tjtj,|j#|j-|j fd|_.d|j._&t'|j._|j.(t/j0||j1|j|j"|j#|j|j |j%|j+|j.|j f d d |_2t'|_dS) Nr\rz&Number of processes must be at least 1rz/maxtasksperchild must be a positive int or Nonezinitializer must be a callabletargetrT)r exitpriority)3_poolr _stater_ctx _setup_queuesqueue SimpleQueue _taskqueue_change_notifierr[_cache_maxtasksperchild _initializer _initargsos cpu_count ValueErrorr:r;callable TypeError _processes_repopulate_poolrFexitcode terminater*_get_sentinels threadingThreadr_handle_workersra_inqueue _outqueue_wrap_exception_worker_handlerdaemonr start _handle_tasks _quick_put _task_handler_handle_results _quick_get_result_handlerrFinalize_terminate_pool _terminate)r processesrKrLmaxtasksperchildcontextp sentinelsrrrrs                  z Pool.__init__cCsF|j|kr|d|t|dt|dddur!|jddSdSdS)Nz&unclosed running multiprocessing pool )sourcern)rhResourceWarninggetattrrnr>)r_warnr rrr__del__ s  z Pool.__del__c Cs0|j}d|jd|jd|jdt|jd S)N<.z state=z pool_size=>)r5r#r$rhlenrg)rclsrrrr6s z Pool.__repr__cCs |jjg}|jjg}g||Sr )rrBrn)rtask_queue_sentinelsself_notifier_sentinelsrrrr|s   zPool._get_sentinelscCsdd|DS)NcSsg|] }t|dr|jqS)sentinel)r@r).0rXrrr s z.Pool._get_worker_sentinels..rworkersrrr_get_worker_sentinelsszPool._get_worker_sentinelscCsPd}ttt|D]}||}|jdur%td||d}||=q |S)zCleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. FNcleaning up worker %dT)reversedrangerrzrrEr*)poolcleanedrRrXrrr_join_exited_workers!s zPool._join_exited_workersc Cs0||j|j|j|j|j|j|j|j|j |j Sr ) _repopulate_pool_staticrirarxrgrrrqrrrprr rrrry1s zPool._repopulate_poolc Csft|t|D](} ||t|||||| fd} | jdd| _d| _| || t dqdS)zBring the number of pool processes up to the specified number, for use after reaping workers which have exited. rcra PoolWorkerTz added workerN) rrrXnamereplacerrappendrrE) rbrarrrIrJrKrLrrNrRwrrrr:s  zPool._repopulate_pool_staticc Cs.t|rt|||||||||| dSdS)zEClean up any exited workers and start replacements for them. N)rrr) rbrarrrIrJrKrLrrNrrr_maintain_poolMs  zPool._maintain_poolcCs4|j|_|j|_|jjj|_|jjj|_ dSr ) rirlrrr9sendrrBrecvrr rrrrjYs   zPool._setup_queuescCs|jtkr tddS)NzPool not running)rhr rur rrr_check_running_s zPool._check_runningcCs||||S)zT Equivalent of `func(*args, **kwds)`. Pool must be running. ) apply_asyncr?)rrSrrTrrrapplycsz Pool.applycC|||t|S)zx Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ) _map_asyncrr?rrSiterable chunksizerrrrjszPool.mapcCr)z Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). )rrr?rrrrrqsz Pool.starmapcC|||t|||S)z= Asynchronous version of `starmap()` method. )rrrrSrrcallbackerror_callbackrrr starmap_asyncys zPool.starmap_asyncc csnzd}t|D] \}}||||fifVqWdSty6}z||dt|fifVWYd}~dSd}~ww)zProvides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.rN) enumeraterFrG)r result_jobrSrrRxrVrrr_guarded_task_generations$zPool._guarded_task_generationrcC||dkrt|}|j||j|||jf|S|dkr(td|t |||}t|}|j||jt ||jfdd|DS)zP Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. rzChunksize must be 1+, not {0:n}cs|] }|D]}|VqqdSr rrchunkr_rrr zPool.imap..) r IMapIteratorrmr>r_job _set_lengthrur=r _get_tasksrrrSrrrU task_batchesrrrimaps4z Pool.imapcCr)zL Like `imap()` method but ordering of results is arbitrary. rzChunksize must be 1+, not {0!r}csrr rrrrrrrz&Pool.imap_unordered..) rIMapUnorderedIteratorrmr>rrrrur=rrrrrrrimap_unordereds0zPool.imap_unorderedcCs6|t|||}|j|jd|||fgdf|S)z; Asynchronous version of `apply()` method. rN)r ApplyResultrmr>r)rrSrrTrrrUrrrrs zPool.apply_asynccCr)z9 Asynchronous version of `map()` method. )rrrrrr map_asyncszPool.map_asyncc Cs|t|ds t|}|dur%tt|t|jd\}}|r%|d7}t|dkr-d}t|||}t||t|||d} |j | | j ||df| S)zY Helper function to implement map, starmap and their async counterparts. __len__Nrrr) rr@rdivmodrrgrr MapResultrmr>rr) rrSrmapperrrrextrarrUrrrrs,  zPool._map_asynccCs,t||d|s||r dSdS)N)timeout)remptyr?)rchange_notifierrrrr_wait_for_updatess zPool._wait_for_updatesc Cst}|jtks|r9|jtkr9|||||||| | | | g||| }||||jtks|r9|jtks|dt ddS)Nzworker handler exiting) r}current_threadrhr r rrrr>rrE)rcache taskqueuerbrarrrIrJrKrLrrNrrthreadcurrent_sentinelsrrrrs  zPool._handle_workersc Cstt}t|jdD]z\}}d}zm|D]D}|jtkr!tdnTz||WqtyW} z$|dd\} } z ||  | d| fWn t yLYnwWYd} ~ qd} ~ ww|rmtd|re|dnd} || dWd}}} q Wd}}} n d}}} wtdztd| dtd |D]} |dqWnt ytd Ynwtd dS) Nz'task handler found thread._state != RUNFzdoing set_length()rrztask handler got sentinelz/task handler sending sentinel to result handlerz(task handler sending sentinel to workersz/task handler got OSError when sending sentinelsztask handler exiting) r}riterr?rhr rrErF_setKeyErrorr>rD) rr>rJrrrtaskseq set_lengthrPrVrQidxrrrrrsN            zPool._handle_tasksc Cst} z|}WnttfytdYdSw|jtkr0|jtks*Jdtdn*|dur:tdn |\}}}z || ||Wn t yRYnwd}}}q|r|jtkrz|}WnttfywtdYdSw|durtdqZ|\}}}z || ||Wn t yYnwd}}}|r|jtksat |drtdzt d D] }|j sn|qWn ttfyYnwtd t||jdS) Nrz.result handler got EOFError/OSError -- exitingzThread not in TERMINATEz,result handler found thread._state=TERMINATEzresult handler got sentinelz&result handler ignoring extra sentinelrBz"ensuring that outqueue is not full z7result handler exiting: len(cache)=%s, thread._state=%s)r}rrDrCrrErhr r rrr@rrBpollr)rJr?rrrPrQrRobjrrrr=sn                   zPool._handle_resultsccs0t|} tt||}|sdS||fVqr )rtuplerislice)rSitsizerrrrrys zPool._get_taskscCstd)Nz:pool objects cannot be passed between processes or pickled)NotImplementedErrorr rrrr-szPool.__reduce__cCs6td|jtkrt|_t|j_|jddSdS)Nz closing pool)rrErhr r rrnr>r rrrrAs  z Pool.closecCstdt|_|dS)Nzterminating pool)rrEr rhrr rrrr{s  zPool.terminatecCshtd|jtkrtd|jttfvrtd|j|j |j |j D]}|q+dS)Nz joining poolzPool is still runningzIn unknown state) rrErhr rur r rr*rrrg)rrrrrr*s       z Pool.joincCs\td|j|r(|jr,|jt d|r*|jsdSdSdSdS)Nz7removing tasks from inqueue until task handler finishedr) rrE_rlockacquireis_aliverBrrtimesleep)rI task_handlerrrrr_help_stuff_finishs    "zPool._help_stuff_finishc CsVtdt|_|dt|_td|||t||s,t| dkr,tdt|_|d|dtdt |urH| |rdt |ddrdtd|D] } | j durc| qXtdt |urs| td t |ur| |rt |ddrtd |D]} | rtd | j| qdSdSdS) Nzfinalizing poolz&helping task handler/workers to finishrz.Cannot have cache with result_hander not alivezjoining worker handlerr{zterminating workerszjoining task handlerzjoining result handlerzjoining pool workersr)rrEr rhr>rrrr<r}rr*r@rzr{pid) rrrIrJrrworker_handlerrresult_handlerrrrrrrsJ              zPool._terminate_poolcCs ||Sr )rr rrr __enter__szPool.__enter__cCs |dSr )r{)rexc_typeexc_valexc_tbrrr__exit__rz Pool.__exit__)NNrNNr )NNN)r)-r"r#r$r7r staticmethodrarwarningswarnr rr6r|rrryrrrjrrrrrrrrrrrr classmethodrrrrr-rAr{r*rrrrrrrrrsx  S                - ;   5 c@sJeZdZddZddZddZddd Zdd d Zd d Ze e j Z dS)rcCs>||_t|_tt|_|j|_||_||_ ||j|j<dSr ) rgr}Event_eventnext job_counterrro _callback_error_callback)rrrrrrrrs  zApplyResult.__init__cCs |jSr )r is_setr rrrreadyrzApplyResult.readycCs|s td||jS)Nz{0!r} not ready)rrur=_successr rrr successfulszApplyResult.successfulNcCs|j|dSr )r rrrrrrrr.zApplyResult.waitcCs(|||s t|jr|jS|jr )rrrr_valuerrrrr?s zApplyResult.getcCsZ|\|_|_|jr|jr||j|jr|js||j|j|j|j=d|_dSr ) rrrrr setrorrgrrRrrrrrs        zApplyResult._setr ) r"r#r$rrrrr?rr types GenericAlias__class_getitem__rrrrrs     rc@r)rcCsjtj||||dd|_dg||_||_|dkr(d|_|j|j|j =dS||t |||_dS)NrTr) rrrr _chunksize _number_leftr rrorbool)rrrlengthrrrrrrs   zMapResult.__init__cCs|jd8_|\}}|r>|jr>||j||j|d|j<|jdkr<|jr-||j|j|j=|jd|_ dSdS|sI|jrId|_||_|jdkrf|j rW| |j|j|j=|jd|_ dSdS)NrrF) rrrrrrorr rrgr)rrRsuccess_resultsuccessrUrrrr)s*            zMapResult._setN)r"r#r$rrrrrrrs rc@s:eZdZddZddZd ddZeZdd Zd d ZdS) rcCsT||_tt|_tt|_|j|_t |_ d|_ d|_ i|_||j|j<dS)Nr)rgr} ConditionLock_condr r rro collectionsdeque_items_index_length _unsorted)rrrrrrGs  zIMapIterator.__init__cCs|Sr rr rrr__iter__RszIMapIterator.__iter__Nc Cs|jIz|j}Wn9tyD|j|jkrd|_td|j|z|j}WntyA|j|jkr>d|_tdt dwYnwWdn1sOwY|\}}|r\|S|r ) r#r&popleft IndexErrorr'r(rg StopIterationrr)rrr_r r2rrrr Us0     zIMapIterator.nextcCs|j\|j|kr<|j||jd7_|j|jvr6|j|j}|j||jd7_|j|jvs|jn||j|<|j|jkrW|j|j =d|_ WddSWddS1sbwYdSNr) r#r'r&rr)popnotifyr(rorrgrrrrrms"         "zIMapIterator._setcCsh|j'||_|j|jkr"|j|j|j=d|_WddSWddS1s-wYdSr )r#r(r'r0rorrg)rrrrrr~s   "zIMapIterator._set_lengthr ) r"r#r$rr*r __next__rrrrrrrEs  rc@seZdZddZdS)rcCs||j1|j||jd7_|j|j|jkr,|j|j=d|_WddSWddS1s7wYdSr.) r#r&rr'r0r(rorrgrrrrrs    "zIMapUnorderedIterator._setN)r"r#r$rrrrrrs rc@sVeZdZdZeddZdddZdd Zd d Zed d Z eddZ ddZ dS)rFcOsddlm}||i|S)Nrr`)dummyra)rbrrTrarrrras zThreadPool.ProcessNrcCst||||dSr )rr)rrrKrLrrrrszThreadPool.__init__cCs,t|_t|_|jj|_|jj|_dSr )rkrlrrr>rr?rr rrrrjs   zThreadPool._setup_queuescCs |jjgSr )rnrBr rrrr|rzThreadPool._get_sentinelscCsgSr rrrrrrrZz ThreadPool._get_worker_sentinelscCsBz |jddqtjyYnwt|D]}|dqdS)NTF)block)r?rkEmptyrr>)rIrrrRrrrrs   zThreadPool._help_stuff_finishcCst|dSr )rr)rrrrrrrrszThreadPool._wait_for_updates)NNr) r"r#r$rrrarrjr|rrrrrrrrs     )NrNF))__all__r$rrsrkr}rr'rrr&rrr connectionrr r r r countr rrrFrr%r,r0rXrGdictr[objectrr AsyncResultrrrrrrrrsP     -@++E