o ]Lb=@sDddlmZddlZddlZddlZddlZddlZddlZz ddlZej Wne y6ddl m ZYnwddl mZddlmZmZmZmZmZddZd d Zejrcd d ZGd ddeZndd ZddZejsqejrwdZejZndZdZd!ddZ d"ddZ ddZ!ddZ"ddZ#ejre#Z$ne!Z$e"Z%dd Z&dS)#)absolute_importN) selectors2)_)encodingerrorpycompatscmutilutilc Cstzttd}|dkr|WSWn ttfyYnwzttjd}|dkr+|WSWdSttfy9YdSw)z-try to count the number of CPUs on the systemSC_NPROCESSORS_ONLNrsNUMBER_OF_PROCESSORSr)intossysconfAttributeError ValueErrorrenvironKeyError)nr2/usr/lib/python3/dist-packages/mercurial/worker.py countcpus"s$rcCsZ|dd}|r$z t|}|dkr|WSWnty#ttdwtttddS)Nworkersnumcpusrs!number of cpus must be an integer ) configr rrAbortrminmaxr)uisrrrr _numworkers8s  r cCsttkSN) threadingcurrent_thread main_threadrrrr ismainthreadFsr%c@s&eZdZddZddZd ddZdS) _blockingreadercCs ||_dSr!)_wrapped)selfwrappedrrr__init__J z_blockingreader.__init__cCs |jSr!)r'readliner(rrrr,Qr+z_blockingreader.readlinecCsl|dkr |jSt|}t|}d}||kr,|j||d}|s$n||7}||ks~||d=t|SNr)r'readall bytearray memoryviewreadintobytes)r(sizebufviewposretrrrreadUs  z_blockingreader.readN)r.)__name__ __module__ __qualname__r*r,r:rrrrr&Isr&cCstttjSr!) isinstancer"r# _MainThreadrrrrr%jscCs|Sr!r)r)rrrr&osg{Gz?gꌠ9Y>)FFTcCs8|strdS||}t|}|t|||}|dkS)zetry to determine whether the benefit of multiple processes can outweigh the cost of starting themFg333333?)_DISALLOW_THREAD_UNSAFEr _STARTUP_COST)r costperopnops threadsafelinearworkersbenefitrrr worthwhile~s rHcCsX|dd}|rtturtsd}|r%t||t||dr%t|||||S|||fS)a0run a function, possibly in parallel in multiple worker processes. returns a progress iterator costperarg - cost of a single task func - function to run. It is expected to return a progress iterator. staticargs - arguments to pass to every invocation of the function args - arguments to split into chunks, to pass to individual workers hasretval - when True, func and the current function return an progress iterator then a dict (encoded as an iterator that yield many (False, ..) then a (True, dict)). The dicts are joined in some arbitrary order, so overlapping keys are a bad idea. threadsafe - whether work items are thread safe and can be executed using a thread-based worker. Should be disabled for CPU heavy tasks that don't release the GIL. rsenabledF)rD) configbool_platformworker _posixworkerr%rHlen)r costperargfunc staticargsargs hasretvalrDenabledrrrworkers rSc#st|}ttjttjtjtdgfdddfdd  fdd}ttj||t }gi}t |t |t |D]t \  fd } zlz)t} | dkrttjttj fd d } t|| } Wnt |krtd} t| t } |j| d YWt |krzz|WnYWt| d @n6t| d @wn-t |krzz|Wn YWt| d @wWt| d @wt| d @ww| qNt D]\ t  tddtjq fdd}zmt }|dkr D]X\}}zt j!"t#|j$}|rR|drR|%|dn|VWq4t&yq '|j$|j$|d8}Yq4t(y}z|j)t)j*krWYd}~q4d}~ww|dks0Wn ||}|r|dkrt+t | t,-||rd|fVdSdS)Nrc s`ttjD]$}z t|tjWq ty-}z |jtjkr#WYd}~q d}~wwdSr!)signalSIGCHLDr killSIGTERMOSErrorerrnoESRCH)perr)oldchldhandlerpidsrr killworkerss  z!_posixworker..killworkersTc sD]W}d}} zt||rdntj\}}Wn*tyD}z|jtjkr.WYd}~q |jtjkr?|WYd}~nd}~ww|sHq|t |}|r[ds[|d<qdSr/) copyr waitpidWNOHANGrXrYEINTRECHILDdiscard _exitstatus)blockingpidr[ste)r^problemrrwaitforworkerss.         z$_posixworker..waitforworkerscs dddrdSdS)NF)rgrr)signumframe)r_rkrlrrsigchldhandlers  z$_posixworker..sigchldhandlerr.cs`ddD]\}}t|t|qtfD] }ttj|q!dS)Nr.r)r closewriter pickledumps)rwresult)rNpargspipesrfdrOwfdrr workerfuncs   z _posixworker..workerfunc)forcerbcs2ttjttjdSr/)rTSIGINTrUrpr)r] oldhandlerrkselectorrlrrcleanups z_posixworker..cleanuprT).r rT getsignalrSIG_IGNsetrUflushr getpid partitionrrLpipeappendforkr callcatchsysexc_info issubclassKeyboardInterrupt traceback_exitadd selectorsDefaultSelectorrpregisterfdopen EVENT_READselectr rrloadr&fileobjupdateEOFError unregisterIOErrorrYrcrVr WorkerError)rrNrOrPrQrFro parentpidretvalr9rhr{exctyper|r openpipeskeyeventsresrjstatusr) rNr_r]rrwr^rxrkryrrOrlrzrrKs                     rKcCs.t|r t|St|rt| SdS)zconvert a posix exit status into the same form returned by os.spawnv returns None if the process was stopped instead of exitingN)r WIFEXITED WEXITSTATUS WIFSIGNALEDWTERMSIG)coderrr_posixexitstatusAs    rc #sGdddtj}gfdd}t}tj}tj} i} t||dD]} | | q*t|D]} || |||} | | q6zKt dkr| sm| }|rf|drf| |dn|V| rTddd d D}|D]} | jdur| j| q}t dksPWn ttfy|w| s| }|r|dr| |dn|V| r|rd | fVdSdS) Nc@s$eZdZddZddZddZdS)z_windowsworker..Workerc_sHtjj|g|Ri|||_||_||_||_d|_d|_d|_ dS)NFT) r"Threadr* _taskqueue _resultqueue_func _staticargs _interrupteddaemon exception)r( taskqueue resultqueuerNrOrPkwargsrrrr*Ns z'_windowsworker..Worker.__init__cSs d|_dS)NT)rr-rrr interruptZr+z(_windowsworker..Worker.interruptc Ssz?|js=z!|j}|j|j|fD]}|j||jr%WWdSqWn tj j y4YWdSw|jrWdSWdSt yO}z||_ d}~wwr!) rempty get_nowaitrrrputrrqueueEmpty Exceptionr)r(rPrrjrrrrun]s&     z"_windowsworker..Worker.runN)r;r<r=r*rrrrrrWorkerMs rcs^td}D]}|qD]}|t}|||r,tddSqdS)Nrs:failed to kill worker threads while handling an exception )timerjoinis_alivewarnr) cleanupendt remainingtimethreadsrrrtrykillworkersss    z&_windowsworker..trykillworkersrrg?cSsg|]}|s|qSr)r).0_trrr sz"_windowsworker..T)r"rr rrQueuerrrangerstartrLrgetrrrremoverr)rrNrOrPrQrrrFrrrrw_irrfinishedthreadsrrr_windowsworkerLsV$             rccs$t|D] }||d|VqdS)apartition a list into N slices of roughly equal size The current strategy takes every Nth element from the input. If we ever write workers that need to preserve grouping in input we should consider allowing callers to specify a partition strategy. olivia is not a fan of this partitioning strategy when files are involved. In his words: Single-threaded Mercurial makes a point of creating and visiting files in a fixed order (alphabetical). When creating files in order, a typical filesystem is likely to allocate them on nearby regions on disk. Thus, when revisiting in the same order, locality is maximized and various forms of OS and disk-level caching and read-ahead get a chance to work. This effect can be quite significant on spinning disks. I discovered it circa Mercurial v0.4 when revlogs were named by hashes of filenames. Tarring a repo and copying it to another disk effectively randomized the revlog ordering on disk by sorting the revlogs by hash and suddenly performance of my kernel checkout benchmark dropped by ~10x because the "working set" of sectors visited no longer fit in the drive's cache and the workload switched from streaming to random I/O. What we should really be doing is have workers read filenames from a ordered queue. This preserves locality and also keeps any worker from getting more than one file out of balance. N)r)lstnslicesirrrrs rr)FT)' __future__rrYr rTrr"rr BaseSelector ImportError thirdpartyri18nrrrrr r rr ispy3r%objectr&isposix iswindowsrAr@rHrSrKrrrJrfrrrrrsN      !   % c