o 6ahK @sTddlZddlZddlZddlZddlZddlZddlZddlZddl Z ddl m Z m Z ddl mZmZmZmZmZddlmZddlmZddZd ed efd d ZGd ddZGdddZGdddZeZe   dd ddededd eedefgeffddZe  dd edefdededd efddZ   dddZdS)N)FutureThreadPoolExecutor)AnyCallableDictOptionaloverload)CurrentThreadExecutor)Localc CsV|D]&}z|||kr|||Wqty(|||YqwdSN)getset LookupError)contextcvarr./usr/lib/python3/dist-packages/asgiref/sync.py_restore_contexts rfuncreturncCsVtjdkr t|St|r|j}t|st|tj r&|j }t|tj st|S)N)) sys version_infoasyncioiscoroutinefunctioninspectismethod__func__ isinstance functoolspartialr)rrrr_iscoroutinefunction_or_partials     r#c@s(eZdZdZddZddZddZdS) ThreadSensitiveContextaiAsync context manager to manage context for thread sensitive mode This context manager controls which thread pool executor is used when in thread sensitive mode. By default, a single thread pool executor is shared within a process. In Python 3.7+, the ThreadSensitiveContext() context manager may be used to specify a thread pool per context. This context manager is re-entrant, so only the outer-most call to ThreadSensitiveContext will set the context. Usage: >>> import time >>> async with ThreadSensitiveContext(): ... await sync_to_async(time.sleep, 1)() cCs d|_dSr )tokenselfrrr__init__?s zThreadSensitiveContext.__init__cs6ztjW|Stytj||_Y|Swr ) SyncToAsyncthread_sensitive_contextr rrr%r&rrr __aenter__Bs  z!ThreadSensitiveContext.__aenter__cs8|jsdStj|d}|r|tj|jdSr )r%r)context_to_thread_executorpopshutdownr*reset)r'excvaluetbexecutorrrr __aexit__Jsz ThreadSensitiveContext.__aexit__N)__name__ __module__ __qualname____doc__r(r+r4rrrrr$+s  r$c@sNeZdZUdZiZded<eZdddZddZ d d Z d d Z d dZ dS) AsyncToSynca Utility class which turns an awaitable that only works on the thread with the event loop into a synchronous callable that works in a subthread. If the call stack contains an async loop, the code runs there. Otherwise, the code runs in a new loop in a new thread. Either way, this thread then pauses and waits to run any thread_sensitive code called from further down the call stack using SyncToAsync, before finally exiting once the async task returns. z,Dict[asyncio.Task[object], threading.Thread] launch_mapFcCst|rt|stjddd||_z|jj|_Wn ty"Ynw|r*d|_dSzt |_WdSt yYt t j dd}|rS|tkrSt t j dd|_YdSd|_YdSw)Nz4async_to_sync was passed a non-async-marked callable) stacklevelmain_event_loop_pidmain_event_loop)callabler#warningswarn awaitable__self__AttributeErrorr>rget_running_loop RuntimeErrorgetattrr) threadlocalosgetpid)r'rBforce_new_loopr=rrrr(hs0    zAsyncToSync.__init__c OsTzt}Wn tyYn w|rtdtg}t}t}t |j dr/|j j }nd}t }||j _ z[| ||||t|} |jrM|jsjt} tdd} | |j| | } |re|| | n|j|jj| |rz||Wt |j dr|j ` |r||j _ t|d|St |j dr|j ` |r||j _ t|dw)NznYou cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.currentr  max_workersr)rrErF is_running contextvars copy_contextr threadingcurrent_threadhasattr executorsrLr main_wraprexc_infor>new_event_looprsubmit_run_event_looprun_until_futureresultcall_soon_threadsafe create_taskr) r'argskwargs event_loopr call_result source_threadold_current_executorcurrent_executorrBloop loop_executor loop_futurerrr__call__s^            zAsyncToSync.__call__cst|zm||Wz[t|D]}|qfdd}||D]}|r0q)|durA|d||dq)t|dr[|| W| t|j dSW| t|j dS| t|j wzYt|D]}|q{fdd}||D]}|rq|dur|d||dqt|dr|| W| t|j wW| t|j w| t|j w)zP Runs the given event loop (designed to be called in a thread). cstjddiIdHdS)Nreturn_exceptionsT)rgatherrtasksrrrksz+AsyncToSync._run_event_loop..gatherNz(unhandled exception during loop shutdown)message exceptiontaskshutdown_asyncgens) rset_event_looprun_until_complete all_taskscancel cancelledrocall_exception_handlerrTrqcloser>)r'rfcororprkrrlrrZsn              zAsyncToSync._run_event_loopcCst|j|}t||jSz* Include self for methods )r!r"riupdate_wrapperrB)r'parentobjtyperrrr__get__szAsyncToSync.__get__c s|dur t|dt}||j|<zaz*|dr3z|dty2|j|i|IdH}Yn w|j|i|IdH}WntyV} z || WYd} ~ nd} ~ ww||W|j|=t |d<dSW|j|=t |d<dS|j|=t |d<w)zs Wraps the awaitable with something that puts the result into the result/exception future. Nrr ) rr)get_current_taskr: BaseExceptionrB set_exception set_resultrPrQ) r'r_r`rbrcrWr current_taskr\errrrVs4    zAsyncToSync.main_wrapN)F) r5r6r7r8r:__annotations__r rUr(rirZr~rVrrrrr9Ts   !E$ r9c @seZdZUdZdejvreZe e e ejddiZ de d<eZe ddZedZde d<ed Zd e d <eZd e d < ddedefdededddfddZddZddZddZ e!ddZ"dS) r)a Utility class which turns a synchronous callable into an awaitable that runs in a threadpool. It also sets a threadlocal inside the thread so calls to AsyncToSync can escape it. If thread_sensitive is passed, the code will run in the same thread as any outer code. This is needed for underlying Python code that is not threadsafe (for example, code which handles SQLite database connections). If the outermost program is async (i.e. SyncToAsync is outermost), then this will be a dedicated single sub-thread that all sync code runs in, one after the other. If the outermost program is sync (i.e. AsyncToSync is outermost), this will just be the main thread. This is achieved by idling with a CurrentThreadExecutor while AsyncToSync is blocking its sync parent, rather than just blocking. If executor is passed in, that will be used instead of the loop's default executor. In order to pass in an executor, thread_sensitive must be set to False, otherwise a TypeError will be raised. ASGI_THREADSrMz,Dict[threading.Thread, asyncio.Task[object]]r:r r*zcontextvars.ContextVar[str]deadlock_contextzcontextvars.ContextVar[bool]z5weakref.WeakKeyDictionary[object, ThreadPoolExecutor]r,TNr.thread_sensitiver3rrcCsxt|rt|r td||_t||||_tjj |_ |r'|dur'td||_ z|j |_ WdSt y;YdSw)Nz4sync_to_async can only be applied to sync functions.z6executor must not be set when thread_sensitive is True) r?r# TypeErrorrr!r{_thread_sensitiver coroutines _is_coroutine _executorrCrD)r'rrr3rrrr(Ss    zSyncToAsync.__init__c s`t}|jrQttjdrtjj}nA|jr7|jdr7|j}||j vr,|j |}n(t dd}||j |<n|j rD|j drDt d|j }|j rP|j dn|j}t}tj|jg|Ri|}|j}|f}i}z3||tj|j||t|g|Ri|} tj| ddIdH} Wt||j r|j d| St||j r|j dww)NrLr rMFz9Single thread executor already being used, would deadlockT)timeout)rrErrTr9rUrLr*r r,rrrFsingle_thread_executorrrrPrQr!r"rrunrun_in_executorthread_handlerrrrWwait_forr) r'r_r`rfr3r*rchildrfutureretrrrrigsf           zSyncToAsync.__call__cCst|j|Srz)r!r"ri)r'r|r}rrrr~szSyncToAsync.__get__c Os||j_t|j_t}tj ||krd}n||j|<d}z0|drBz|dt yA||i|YW|r@|j|=SSw||i|W|rP|j|=SS|rW|j|=w)zE Wraps the sync application with exception handling. FTr ) rHr>rIrJr=rRrSr9r:r r) r'rf source_taskrWrr_r`rS parent_setrrrrs,      zSyncToAsync.thread_handlercCs ztWStyYdSw)zi Implementation of asyncio.current_task() that returns None if there is no task. N)rrrFrrrrrs   zSyncToAsync.get_current_taskTN)#r5r6r7r8rIenvironrget_event_looprfset_default_executorrintr:rrRlocalrHrrP ContextVarr*rweakrefWeakKeyDictionaryr,rrboolrr(rir~r staticmethodrrrrrr)sD         @!r)Trr3r.cCdSr rrrr3rrr sync_to_asyncrcCrr rrrrrrrcs$|dur fddSt|dS)Ncst|dS)Nrr3r))fr3rrrs zsync_to_async..rrrrrrrs)NTNr)asyncio.coroutinesrrPr!rrIrrRr@rconcurrent.futuresrrtypingrrrrrcurrent_thread_executorr rr rrr#r$r9r) async_to_syncrrrrrsd   )FC