o ~_;*@sdZddlmZeddlmZddlZddlmZddlm Z ddlm Z ddlm Z dd lm Z ej Zej ZGd d d eZdS) z_ Asynchronous job scheduler, for concurrent execution with minimalistic dependency guarantees. )standard_library)objectN)log)require_threading)interruptably_wait) async_split) with_lockc@sPeZdZdZddZddZddZdd Zd d Zd d Z ddZ ddZ dS)AsyncSchedulera Easy-to-use scheduler of function calls to be executed concurrently. A very simple dependency mechanism exists in the form of barriers (see insert_barrier()). Each instance has a concurrency level associated with it. A concurrency of 0 implies that all tasks will be executed synchronously when scheduled. A concurrency of 1 indicates that a task will be executed asynchronously, but never concurrently with other tasks. Both 0 and 1 guarantee strict ordering among all tasks (i.e., they will be executed in the order scheduled). At concurrency levels above 1, the tasks will end up being executed in an order undetermined except insofar as is enforced by calls to insert_barrier(). An AsynchScheduler should be created for any independent process; the scheduler will assume that if any background job fails (raises an exception), it makes further work moot. cCstd|jjtd|f|dksJd|jjfd|_d|_||_d|_d|_ d|_ t |_ |dkr@td|fdSdS)zr Create an asynchronous scheduler that executes jobs with the given level of concurrency. %s: %szinstantiating at concurrency %drz!%s concurrency level must be >= 0FNzconcurrency > 0 (%d))rInfo __class____name____AsyncScheduler__failed_AsyncScheduler__failed_waiter_AsyncScheduler__concurrency_AsyncScheduler__worker_count_AsyncScheduler__waiter_count_AsyncScheduler__barrier threading Condition_AsyncScheduler__cvr)self concurrencyr:/usr/lib/python3/dist-packages/duplicity/asyncscheduler.py__init__@s  zAsyncScheduler.__init__csDtdjjtdfjdkr fdd}tj|dSdS)aL Proclaim that any tasks scheduled prior to the call to this method MUST be executed prior to any tasks scheduled after the call to this method. The intended use case is that if task B depends on A, a barrier must be inserted in between to guarantee that A happens before B. r zinserting barrierrcs d_dSNT)rrrrr_insert_barrierfs z6AsyncScheduler.insert_barrier.._insert_barrierN)rDebugr r rrrr)rrrrrinsert_barrierWs   zAsyncScheduler.insert_barriercCsn|dusJ|jdkr!td|jjtdftjj|||Std|jjtdftjj | ||S)a Schedule the given task (callable, typically function) for execution. Pass the given parameters to the function when calling it. Returns a callable which can optionally be used to wait for the task to complete, either by returning its return value or by propagating any exception raised by said task. This method may block or return immediately, depending on the configuration and state of the scheduler. This method may also raise an exception in order to trigger failures early, if the task (if run synchronously) or a previous task has already failed. NOTE: Pay particular attention to the scope in which this is called. In particular, since it will execute concurrently in the background, assuming fn is a closure, any variables used most be properly bound in the closure. This is the reason for the convenience feature of being able to give parameters to the call, to avoid having to wrap the call itself in a function in order to "fixate" variables in, for example, an enclosing loop. Nrr z4running task synchronously (asynchronicity disabled)z*scheduling task for asynchronous execution) rrr r r rInfoCodesynchronous_upload_begin"_AsyncScheduler__run_synchronouslyasynchronous_upload_begin#_AsyncScheduler__run_asynchronously)rfnparamsrrr schedule_taskks     zAsyncScheduler.schedule_taskcsfdd}tj|dS)aD Wait for the scheduler to become entirely empty (i.e., all tasks having run to completion). IMPORTANT: This is only useful with a single caller scheduling tasks, such that no call to schedule_task() is currently in progress or may happen subsequently to the call to wait(). cstjfdddS)Ncsjdko jdkS)Nr)rrrrrrsz4AsyncScheduler.wait.._wait..)rrrrrr_waitsz"AsyncScheduler.wait.._waitN)rr)rr+rrrwaits zAsyncScheduler.waitcs8||fdd}td|jjtdftjj|S)NcsSNrrretrr_waitersz3AsyncScheduler.__run_synchronously.._waiterr ztask completed successfully)rr r r rr"synchronous_upload_done)rr'r(r0rr.r__run_synchronouslys  z"AsyncScheduler.__run_synchronouslycsJtfdd\}}fddfdd}tj|||S)NcsSr-rr)r'r(rrr*sz5AsyncScheduler.__run_asynchronously..csDjr tdjjtdftjjt djjfdS)Nr zJa previously scheduled task has failed; propagating the result immediatelyz9%s: waiter should have raised an exception; this is a bug) rrr r r rr"asynchronous_upload_donerAssertionErrorrrrrcheck_pending_failures zBAsyncScheduler.__run_asynchronously..check_pending_failurecsjjks jr@jdkr!jsJdd_jnjd7_jjd8_jjks js jd7_tdj j t djffdS)Nrzbarrier should be in effectFr active workers = %d) rrrr notifyAllrr,rr r r rr)r5rrrwait_for_and_register_launchs     zIAsyncScheduler.__run_asynchronously..wait_for_and_register_launch)rrr_AsyncScheduler__start_worker)rr'r(waitercallerr9r)r5r'r(rr__run_asynchronouslys    z#AsyncScheduler.__run_asynchronouslycsfdd}t|ddS)z% Start a new worker. c sDzWfdd}tj|dSfdd}tj|w)Ncs>jd8_tdjjtdjffjdS)Nr6r r7)rrr r r rrr8rrrrcomplete_workers  zJAsyncScheduler.__start_worker..trampoline..complete_worker)_AsyncScheduler__execute_callerrr)r>r<rrr trampolines    z1AsyncScheduler.__start_worker..trampolinerN)threadstart_new_thread)rr<rArr@r__start_workers zAsyncScheduler.__start_workercsP|\}|sfdd}tj|tdjjtd|ftjjdS)Ncs$jsd__jdSdSr)rrrr8rrr;rr_signal_faileds z7AsyncScheduler.__execute_caller.._signal_failedr z!task execution done (success: %s)) rrrr r r rr"r3)rr< succeededrFrrEr__execute_callers    zAsyncScheduler.__execute_callerN) r __module__ __qualname____doc__rr!r)r,r$r&r:r?rrrrr *s2' r )rKfuturerinstall_aliasesbuiltinsr duplicityrduplicity.dup_threadingrrrr dup_threading thread_modulerBthreading_modulerr rrrrs