o [d@s$dZdZddlZddlZddlZddlZddlZddlZddlZzddl Z Wn e y1dZ Ynwddl m Z ddl m Z ddl mZddl mZdd l mZdd l mZdd l mZdd l mZdd lmZddZGddde jZGdddejejZGdddeZGdddeZdS)zEvent loop using a selector and related classes. A selector is a "notify-when-ready" multiplexer. For a subclass which also includes support for signal handling, see the unix_events sub-module. )BaseSelectorEventLoopN) base_events) constants)events)futures) protocols)sslproto) transports)trsock)loggercCs2z||}Wn tyYdSwt|j|@SNF)get_keyKeyErrorboolr)selectorfdeventkeyr./usr/lib/python3.10/asyncio/selector_events.py_test_selector_event s  rcseZdZdZdSfdd ZdSdddddZ dSddddejd d d Z dTd d Z fddZ ddZ ddZ ddZ ddZddZdddejfddZdddejfddZddejfdd Zd!d"Zd#d$Zd%d&Zd'd(Zd)d*Zd+d,Zd-d.Zd/d0Zd1d2Zd3d4ZdSd5d6Zd7d8Zd9d:Zd;d<Z d=d>Z!d?d@Z"dAdBZ#dCdDZ$dSdEdFZ%dGdHZ&dIdJZ'dKdLZ(dMdNZ)dOdPZ*dQdRZ+Z,S)UrzJSelector event loop. See events.EventLoop for API specification. NcsFt|dur t}td|jj||_| t |_ dS)NzUsing selector: %s) super__init__ selectorsDefaultSelectorr debug __class____name__ _selector_make_self_pipeweakrefWeakValueDictionary _transports)selfrrrrr1s zBaseSelectorEventLoop.__init__extraservercCt||||||SN)_SelectorSocketTransport)r$sockprotocolwaiterr'r(rrr_make_socket_transport;s z,BaseSelectorEventLoop._make_socket_transportF) server_sideserver_hostnamer'r(ssl_handshake_timeoutc Cs0tj||||||| d} t||| ||d| jS)N)r2r&)r SSLProtocolr+_app_transport) r$rawsockr- sslcontextr.r0r1r'r(r2 ssl_protocolrrr_make_ssl_transport@sz)BaseSelectorEventLoop._make_ssl_transportcCr)r*)_SelectorDatagramTransport)r$r,r-addressr.r'rrr_make_datagram_transportMsz.BaseSelectorEventLoop._make_datagram_transportcsP|rtd|rdS|t|jdur&|jd|_dSdS)Nz!Cannot close a running event loop) is_running RuntimeError is_closed_close_self_pipercloserr$r%rrr@Rs    zBaseSelectorEventLoop.closecCsB||j|jd|_|jd|_|jd8_dS)Nr)_remove_reader_ssockfilenor@_csock _internal_fdsrArrrr?]s   z&BaseSelectorEventLoop._close_self_pipecCsNt\|_|_|jd|jd|jd7_||j|jdS)NFr) socket socketpairrCrE setblockingrF _add_readerrD_read_from_selfrArrrr es   z%BaseSelectorEventLoop._make_self_pipecCsdSr*rr$datarrr_process_self_datamz(BaseSelectorEventLoop._process_self_datacCsN z|jd}|s WdS||WntyYqty%YdSwq)NTi)rCrecvrNInterruptedErrorBlockingIOErrorrLrrrrKps   z%BaseSelectorEventLoop._read_from_selfcCsR|j}|dur dSz|dWdSty(|jr%tjdddYdSYdSw)Nz3Fail to write a null byte into the self-pipe socketTexc_info)rEsendOSError_debugr r)r$csockrrr_write_to_self|s  z$BaseSelectorEventLoop._write_to_selfdc Cs"|||j||||||dSr*)rJrD_accept_connection)r$protocol_factoryr,r6r(backlogr2rrr_start_servings z$BaseSelectorEventLoop._start_servingc Cst|D]}}z|\}} |jrtd|| ||dWnOtttfy,YdSt ym} z6| j t j t j t j t jfvrb|d| t|d|||tj|j||||||nWYd} ~ qd} ~ wwd| i} |||| |||} || qdS)Nz#%r got a new connection from %r: %rFz&socket.accept() out of system resource)message exceptionrGpeername)rangeacceptrXr rrIrRrQConnectionAbortedErrorrWerrnoEMFILEENFILEENOBUFSENOMEMcall_exception_handlerr TransportSocketrBrD call_laterrACCEPT_RETRY_DELAYr__accept_connection2 create_task) r$r]r,r6r(r^r2_connaddrexcr'rdrrrr\sH      z(BaseSelectorEventLoop._accept_connectionc sd}d}z7|}|} |r|j|||| d|||d}n |j||| ||d}z | IdHWWdSty<|wttfyFty|} z+|jrqd| d} |dur]|| d<|dure|| d<|| WYd} ~ dSWYd} ~ dSd} ~ ww)NT)r.r0r'r(r2)r.r'r(z3Error on transport creation for incoming connection)r`rar- transport) create_futurer8r/ BaseExceptionr@ SystemExitKeyboardInterruptrXrk) r$r]rrr'r6r(r2r-rur.rtcontextrrrrosJ z)BaseSelectorEventLoop._accept_connection2c Cs|}t|ts"zt|}Wntttfy!td|dwz|j|}Wn ty3YdSw|sBt d|d|dS)NzInvalid file object: zFile descriptor z is used by transport ) isinstanceintrDAttributeError TypeError ValueErrorr#r is_closingr=)r$rrDrurrr_ensure_fd_no_transports&  z-BaseSelectorEventLoop._ensure_fd_no_transportc Gs|t|||d}z|j|}Wnty)|j|tj|dfY|Sw|j|j }\}}|j ||tjB||f|durH| |Sr*) _check_closedrHandlerrrregisterr EVENT_READrMmodifycancel r$rcallbackargshandlermaskreaderwriterrrrrJ"  z!BaseSelectorEventLoop._add_readercCs|rdSz|j|}Wn tyYdSw|j|j}\}}|tjM}|s1|j|n |j ||d|f|durE| dSdS)NFT) r>rrrrrMrr unregisterrrr$rrrrrrrrrB s   z$BaseSelectorEventLoop._remove_readerc Gs|t|||d}z|j|}Wnty)|j|tjd|fY|Sw|j|j }\}}|j ||tjB||f|durH| |Sr*) rrrrrrrr EVENT_WRITErMrrrrrr _add_writer!rz!BaseSelectorEventLoop._add_writercCs|rdSz|j|}Wn tyYdSw|j|j}\}}|tjM}|s1|j|n |j |||df|durE| dSdS)Remove a writer callback.FNT) r>rrrrrMrrrrrrrrr_remove_writer1s   z$BaseSelectorEventLoop._remove_writercG"|||j||g|RdS)zAdd a reader callback.N)rrJr$rrrrrr add_readerH z BaseSelectorEventLoop.add_readercC||||S)zRemove a reader callback.)rrBr$rrrr remove_readerM  z#BaseSelectorEventLoop.remove_readercGr)zAdd a writer callback..N)rrrrrr add_writerRrz BaseSelectorEventLoop.add_writercCr)r)rrrrrr remove_writerWrz#BaseSelectorEventLoop.remove_writerc t||jr|dkrtdz||WSttfy$Ynw|}| }| || ||j |||}| tj|j||d|IdHS)zReceive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. rthe socket must be non-blockingrN)r_check_ssl_socketrX gettimeoutrrPrRrQrvrDrrJ _sock_recvadd_done_callback functoolspartial_sock_read_done)r$r,nfutrrrrr sock_recv\s"    zBaseSelectorEventLoop.sock_recvcC"|dus|s||dSdSr*) cancelledrr$rrrrrrrrz%BaseSelectorEventLoop._sock_read_donec C|rdSz||}Wn,ttfyYdSttfy"ty9}z ||WYd}~dSd}~ww||dSr*) donerPrRrQrxryrw set_exception set_result)r$rr,rrMrtrrrrvsz BaseSelectorEventLoop._sock_recvc r)zReceive data from the socket. The received data is written into *buf* (a writable buffer). The return value is the number of bytes written. rrrN)rrrXrr recv_intorRrQrvrDrrJ_sock_recv_intorrrr)r$r,bufrrrrrrsock_recv_intos"    z$BaseSelectorEventLoop.sock_recv_intoc Crr*) rrrRrQrxryrwrr)r$rr,rnbytesrtrrrrsz%BaseSelectorEventLoop._sock_recv_intoc st||jr|dkrtdz||}Wn ttfy'd}Ynw|t|kr0dS| }| }| || ||j ||t||g}|tj|j||d|IdHS)aSend data to the socket. The socket must be connected to a remote socket. This method continues to send data from data until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. rrNr)rrrXrrrVrRrQlenrvrDrr _sock_sendall memoryviewrrr_sock_write_done)r$r,rMrrrrrrr sock_sendalls*     z"BaseSelectorEventLoop.sock_sendallc Cs|rdS|d}z |||d}Wn,ttfy!YdSttfy*tyA}z ||WYd}~dSd}~ww||7}|t|krS| ddS||d<dS)Nr) rrVrRrQrxryrwrrr)r$rr,viewposstartrrtrrrrs$   z#BaseSelectorEventLoop._sock_sendallcst||jr|dkrtd|jtjks"tjr;|jtj kr;|j ||j|j |j |dIdH}|d\}}}}}| }||||z|IdHWd}Sd}w)zTConnect to a remote socket at address. This method is a coroutine. rr)familytypeprotoloopN)rrrXrrrrGAF_INET _HAS_IPv6AF_INET6_ensure_resolvedrrrv _sock_connect)r$r,r:resolvedrqrrrr sock_connects$      z"BaseSelectorEventLoop.sock_connectc Cs|}zaz||WnEttfy2|||||j|||}|tj |j ||dYn*t t fy;t yQ}z ||WYd}~nd}~ww|dWd}dSWd}dSWd}dSd}w)Nr)rDconnectrRrQrr_sock_connect_cbrrrrrxryrwrr)r$rr,r:rrrtrrrrs.    z#BaseSelectorEventLoop._sock_connectcCrr*)rrrrrrrrz&BaseSelectorEventLoop._sock_write_donec Cs|rdSzUz|tjtj}|dkrt|d|Wn*ttfy(Yn*tt fy1t yG}z | |WYd}~nd}~ww| dWd}dSWd}dSWd}dSd}w)NrzConnect call failed ) r getsockoptrG SOL_SOCKETSO_ERRORrWrRrQrxryrwrr)r$rr,r:errrtrrrrs* z&BaseSelectorEventLoop._sock_connect_cbcsDt||jr|dkrtd|}||||IdHS)aWAccept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. rrN)rrrXrrrv _sock_accept)r$r,rrrr sock_accept*s   z!BaseSelectorEventLoop.sock_acceptc Cs|}z |\}}|dWnFttfy7|||||j||}|t j |j ||dYdSt t fy@tyW}z ||WYd}~dSd}~ww|||fdS)NFr)rDrdrIrRrQrrJrrrrrrxryrwrr)r$rr,rrrr:rrtrrrr9s"   z"BaseSelectorEventLoop._sock_acceptc s|j|j=|}||IdHz|j|j|||ddIdHW||r.|||j|j<S||r?|||j|j<w)NF)fallback) r#_sock_fd is_reading pause_reading_make_empty_waiter sock_sendfile_sock_reset_empty_waiterresume_reading)r$transpfileoffsetcountrrrr_sendfile_nativeJs"  z&BaseSelectorEventLoop._sendfile_nativecCs|D];\}}|j|j}\}}|tj@r&|dur&|jr!||n|||tj@r=|dur=|jr8||q||qdSr*) fileobjrMrr _cancelledrB _add_callbackrr)r$ event_listrrrrrrrr_process_eventsXs     z%BaseSelectorEventLoop._process_eventscCs|||dSr*)rBrDr@)r$r,rrr _stop_servingfs z#BaseSelectorEventLoop._stop_servingr*NNN)-r __module__ __qualname____doc__rr/rSSL_HANDSHAKE_TIMEOUTr8r;r@r?r rNrKrZr_r\rorrJrBrrrrrrrrrrrrrrrrrrrrrr __classcell__rrr%rr+sl         . )  rcseZdZdZeZdZdfdd ZddZddZ d d Z d d Z d dZ ddZ ejfddZdddZddZddZddZddZZS) _SelectorTransportiNcst||t||jd<z ||jd<Wnty&d|jd<Ynwd|jvrEz ||jd<Wntj yDd|jd<Ynw||_ | |_ d|_ ||||_||_d|_d|_|jdurm|j||j|j <dS)NrGsocknamerbFr)rrr rl_extra getsocknamerW getpeernamerGerrorrrDr_protocol_connected set_protocol_server_buffer_factory_buffer _conn_lost_closing_attachr#)r$rr,r-r'r(r%rrrws0       z_SelectorTransport.__init__cCs|jjg}|jdur|dn|jr|d|d|j|jdurc|jsct|jj |jt j }|r=|dn|dt|jj |jt j }|rQd}nd}| }|d|d |d d d |S) Nclosedclosingzfd=z read=pollingz read=idlepollingidlezwrite=z<{}> )rrrappendrr_loopr>rrrrrget_write_buffer_sizeformatjoin)r$inforstatebufsizerrr__repr__s.      z_SelectorTransport.__repr__cCs|ddSr*) _force_closerArrrabortsz_SelectorTransport.abortcCs||_d|_dSNT) _protocolrr$r-rrrrs z_SelectorTransport.set_protocolcC|jSr*)rrArrr get_protocolz_SelectorTransport.get_protocolcCrr*)rrArrrrrz_SelectorTransport.is_closingcCsX|jrdSd|_|j|j|js*|jd7_|j|j|j|jddSdSNTr) rrrBrrrr call_soon_call_connection_lostrArrrr@sz_SelectorTransport.closecCs0|jdur|d|t|d|jdSdS)Nzunclosed transport )source)rResourceWarningr@)r$_warnrrr__del__s z_SelectorTransport.__del__Fatal error on transportcCsNt|tr|jrtjd||ddn |j||||jd||dS)Nz%r: %sTrT)r`rarur-) r{rWr get_debugr rrkrr )r$rtr`rrr _fatal_errors  z_SelectorTransport._fatal_errorcCsd|jrdS|jr|j|j|j|js!d|_|j|j|jd7_|j|j |dSr) rrclearrrrrrBrrr$rtrrrr s z_SelectorTransport._force_closecCsz*|jr |j|W|jd|_d|_d|_|j}|dur)|d|_dSdS|jd|_d|_d|_|j}|durG|d|_wr*)rrconnection_lostrr@rr_detach)r$rtr(rrrrs*    z(_SelectorTransport._call_connection_lostcCs t|jSr*)rrrArrrr z(_SelectorTransport.get_write_buffer_sizecGs$|jrdS|jj||g|RdSr*)rrrJrrrrrJsz_SelectorTransport._add_reader)NN)r)rrrmax_size bytearrayrrrr r rrrr@warningswarnrrr rrrJrrrr%rrks"  rcseZdZdZejjZ  d#fdd ZfddZ ddZ d d Z d d Z d dZ ddZddZddZddZddZddZddZfddZdd Zd!d"ZZS)$r+TNcsd|_t|||||d|_d|_d|_t|j|j |j j ||j |j |j|j|dur?|j tj|ddSdSr )_read_ready_cbrr_eof_paused _empty_waiterr _set_nodelayrrrrconnection_maderJr _read_readyr_set_result_unless_cancelled)r$rr,r-r.r'r(r%rrrs   z!_SelectorSocketTransport.__init__cs.t|tjr |j|_n|j|_t|dSr*)r{rBufferedProtocol_read_ready__get_bufferr'_read_ready__data_receivedrrrr%rrrs  z%_SelectorSocketTransport.set_protocolcCs|j o|j Sr*)r)rrArrrrsz#_SelectorSocketTransport.is_readingcCsB|js|jrdSd|_|j|j|jrtd|dSdS)NTz%r pauses reading)rr)rrBrrr rrArrrr"s  z&_SelectorSocketTransport.pause_readingcCsD|js|jsdSd|_||j|j|jr td|dSdS)NFz%r resumes reading) rr)rJrr-rrr rrArrrr*s  z'_SelectorSocketTransport.resume_readingcCs |dSr*)r'rArrrr-2s z$_SelectorSocketTransport._read_readyc CsD|jrdSz|jd}t|stdWn"ttfyty7}z ||dWYd}~dSd}~wwz|j |}Wn-t t fyLYdSttfyUtym}z ||dWYd}~dSd}~ww|sv| dSz |j|WdSttfyty}z ||dWYd}~dSd}~ww)Nz%get_buffer() returned an empty bufferz/Fatal error: protocol.get_buffer() call failed.$Fatal read error on socket transportz3Fatal error: protocol.buffer_updated() call failed.)rr get_bufferrr=rxryrwrrrrRrQ_read_ready__on_eofbuffer_updated)r$rrtrrrrr05sP  z0_SelectorSocketTransport._read_ready__get_bufferc Cs|jrdSz |j|j}Wn-ttfyYdSttfy#ty;}z | |dWYd}~dSd}~ww|sD| dSz |j |WdSttfyWtyo}z | |dWYd}~dSd}~ww)Nr3z2Fatal error: protocol.data_received() call failed.) rrrPr#rRrQrxryrwrr5r data_received)r$rMrtrrrr1Zs4 z3_SelectorSocketTransport._read_ready__data_receivedc Cs|jr td|z|j}Wn"ttfyty4}z | |dWYd}~dSd}~ww|r@|j |j dS| dS)Nz%r received EOFz1Fatal error: protocol.eof_received() call failed.) rrr rr eof_receivedrxryrwrrBrr@)r$ keep_openrtrrrr5ss   z,_SelectorSocketTransport._read_ready__on_eofc Cs(t|tttfstdt|j|jrtd|j dur"td|s&dS|j r=|j t j kr4t d|j d7_ dS|jsz|j|}Wn,ttfySYn,ttfy\tyt}z ||dWYd}~dSd}~ww||d}|sdS|j|j|j|j||dS)N/data argument must be a bytes-like object, not z%Cannot call write() after write_eof()z(unable to write; sendfile is in progresssocket.send() raised exception.r%Fatal write error on socket transport)r{bytesr$rr~rrr(r=r*rr!LOG_THRESHOLD_FOR_CONNLOST_WRITESr warningrrrVrRrQrxryrwrrrr _write_readyextend_maybe_pause_protocol)r$rMrrtrrrwritesB       z_SelectorSocketTransport.writec Cs8|jsJd|jr dSz |j|j}WnKttfy!YdSttfy*ty`}z+|j |j |j | |d|jdurU|j|WYd}~dSWYd}~dSd}~ww|ri|jd|=||js|j |j |jdur|jd|jr|ddS|jr|jtjdSdSdS)NzData should not be emptyr<)rrrrVrRrQrxryrwrrrrrr*r_maybe_resume_protocolrrrr(shutdownrGSHUT_WR)r$rrtrrrr@s>      z%_SelectorSocketTransport._write_readycCs2|js|jrdSd|_|js|jtjdSdSr)rr(rrrErGrFrArrr write_eofs z"_SelectorSocketTransport.write_eofcCsdSrrrArrr can_write_eofrOz&_SelectorSocketTransport.can_write_eofcs.t||jdur|jtddSdS)NzConnection is closed by peer)rrr*rConnectionErrorrr%rrrs  z._SelectorSocketTransport._call_connection_lostcCs6|jdur td|j|_|js|jd|jS)NzEmpty waiter is already set)r*r=rrvrrrArrrrs   z+_SelectorSocketTransport._make_empty_waitercCs d|_dSr*)r*rArrrrr"z,_SelectorSocketTransport._reset_empty_waiterr)rrr_start_tls_compatibler _SendfileMode TRY_NATIVE_sendfile_compatiblerrrrrr-r0r1r5rCr@rGrHrrrrrrr%rr+s* %' r+csJeZdZejZ  d fdd ZddZddZd dd Z d d Z Z S)r9Ncsbt||||||_|j|jj||j|j|j|j |dur/|jt j |ddSdSr*) rr_addressrrrr,rJrr-rr.)r$rr,r-r:r.r'r%rrrs  z#_SelectorDatagramTransport.__init__cCstdd|jDS)Ncss|] \}}t|VqdSr*)r).0rMrqrrr szC_SelectorDatagramTransport.get_write_buffer_size..)sumrrArrrrsz0_SelectorDatagramTransport.get_write_buffer_sizec Cs|jrdSz |j|j\}}WnEttfyYdSty4}z |j|WYd}~dSd}~wt t fy=t yU}z | |dWYd}~dSd}~ww|j ||dS)Nz&Fatal read error on datagram transport)rrrecvfromr#rRrQrWrerror_receivedrxryrwrdatagram_receivedr$rMrsrtrrrr-s z&_SelectorDatagramTransport._read_readyc Cs|t|tttfstdt|j|sdS|jr+|d|jfvr(td|j|j}|j rE|jrE|j t j krr r?rrrrVsendtorRrQrrr _sendto_readyrWrrSrxryrwrrrBrUrrrrW sR        z!_SelectorDatagramTransport.sendtoc Cs|jro|j\}}z|jdr|j|n|j||WnLttfy2|j||fYn=t yJ}z |j |WYd}~dSd}~wt t fyStyk}z ||dWYd}~dSd}~ww|js||js|j|j|jr|ddSdSdS)NrbrV)rpopleftrrrVrWrRrQ appendleftrWrrSrxryrwrrDrrrrrrUrrrrX8s>  z(_SelectorDatagramTransport._sendto_readyrr*) rrr collectionsdequerrrr-rWrXrrrr%rr9s  +r9)r__all__r[rfrrrGr%r!ssl ImportErrorrrrrrr r r logr r BaseEventLoopr_FlowControlMixin Transportrr+r9rrrrsH            Do