
    d                         d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlm	Z	 d dl
mZmZmZ d dlmZ d dlmZ d Z G d	 d
e      Zy)    N)logger)	add_eventWALAEventOperation)ServiceStoppedError)ustrQueueEmpty)ThreadHandlerInterface)textutilc                     t        |       S N)SendTelemetryEventsHandler)protocol_utils    J/usr/lib/python3/dist-packages/azurelinuxagent/ga/send_telemetry_events.py!get_send_telemetry_events_handlerr      s    %m44    c                       e Zd ZdZdZ ej                  d      j                  ZdZ	 ej                  d      Z
d Zed        Zd Zd	 Zd
 Zd Zd Zd Zd Zd Zd Zd Zd Zy)r   z
    This Handler takes care of sending all telemetry out of the agent to Wireserver. It sends out data as soon as
    there's any data available in the queue to send.
    SendTelemetryHandler   )seconds   c                 h    |j                         | _        d| _        d | _        t	               | _        y )NT)get_protocol	_protocol
should_run_threadr   _queue)selfr   s     r   __init__z#SendTelemetryEventsHandler.__init__.   s+    &335 gr   c                  "    t         j                  S r   )r   _THREAD_NAME r   r   get_thread_namez*SendTelemetryEventsHandler.get_thread_name<   s    )666r   c                 N    t        j                  d       | j                          y )Nz#Start SendTelemetryHandler service.)r   infostartr   s    r   runzSendTelemetryEventsHandler.run@   s    9:

r   c                 V    | j                   d uxr | j                   j                         S r   )r   is_aliver'   s    r   r*   z#SendTelemetryEventsHandler.is_aliveD   s#    ||4'CDLL,A,A,CCr   c                 
   t        j                  | j                        | _        | j                  j	                  d       | j                  j                  | j                                | j                  j                          y )N)targetT)	threadingThread_process_telemetry_threadr   	setDaemonsetNamer#   r&   r'   s    r   r&   z SendTelemetryEventsHandler.startG   sW     ''t/M/MNt$T1134r   c                 T    d| _         | j                         r| j                          yy)zO
        Stop server communication and join the thread to main thread.
        FN)r   r*   joinr'   s    r   stopzSendTelemetryEventsHandler.stopM   s"      ==?IIK r   c                 l    | j                   j                          | j                  j                          y r   )r   r3   r   r'   s    r   r3   zSendTelemetryEventsHandler.joinU   s"    r   c                     | j                    S r   )r   r'   s    r   stoppedz"SendTelemetryEventsHandler.stoppedY   s    ??""r   c                 4   | j                         r(t        dj                  | j                                     	 | j                  j                  |t        j                         y # t        $ r(}t        dj                  t        |                  d }~ww xY w)Nz,{0} is stopped, not accepting anymore eventstimeoutzMUnable to enqueue due to: {0}, stopping any more enqueuing until the next run)
r7   r   formatr#   r   putr   _MAX_TIMEOUT	Exceptionr   )r   eventerrors      r   enqueue_eventz(SendTelemetryEventsHandler.enqueue_event\   s    <<>%&T&[&[\`\p\p\r&stt	uKKOOE+E+R+ROS 	u%_ffgklqgrsu u	us   +A& &	B/#BBc                     	 | j                   j                  t        j                        }| j                   j	                          |S # t
        $ r d}Y |S w xY w)aE  
        Wait for atleast one event in Queue or timeout after SendTelemetryEventsHandler._MAX_TIMEOUT seconds.
        In case of a timeout, set the event to None.
        :return: event if an event is added to the Queue or None to signify no events were added in queue.
        This would raise in case of an error.
        r9   N)r   getr   r=   	task_doner	   )r   r?   s     r   _wait_for_event_in_queuez3SendTelemetryEventsHandler._wait_for_event_in_queuel   sT    	KKOO,F,S,SOTEKK!!#
 	  	E		s   AA AAc                 (   t        j                  dj                  | j                                      	 | j	                         r| j
                  j                         sP| j                         }|r| j                  |       | j	                         s4| j
                  j                         sOy y # t        $ rY}dj                  | j                         t        j                  |            }t        t        j                  |d       Y d }~y d }~ww xY w)Nz#Successfully started the {0} threadzJAn unknown error occurred in the {0} thread main loop, stopping thread.{1}F)opmessage
is_success)r   r%   r;   r#   r7   r   emptyrE   _send_events_in_queuer>   r   format_exceptionr   r   UnhandledError)r   first_eventr@   err_msgs       r   r/   z4SendTelemetryEventsHandler._process_telemetry_thread|   s    9@@AUAUAWXY	_ llnDKK,=,=,?";;= ..{; llnDKK,=,=,?  	_bii$$&(A(A%(HJG+::GX]^^	_s   AB/ B/ /	D8ADDc                    t         j                   j                         }| j                         s2| j                  j	                         dz   | j
                  k  r|| j                  z   t         j                   j                         kD  rt        j                  d| j                  j	                         dz   t         j                   j                         |z
  j                         t        j                  d       | j                         sY| j                  j	                         dz   | j
                  k  r/|| j                  z   t         j                   j                         kD  r| j                  j                  | j                  |             y )N   zMWaiting for events to batch. Total events so far: {0}, Time elapsed: {1} secs)datetimeutcnowr7   r   qsize_MIN_EVENTS_TO_BATCH_MIN_BATCH_WAIT_TIMEr   verboser   timesleepr   report_event_get_events_in_queue)r   rN   
start_times      r   rK   z0SendTelemetryEventsHandler._send_events_in_queue   s"   &&--/
,,.dkk&7&7&9A&=AZAZ%ZT666(:K:K:R:R:T`U
 NNj;;,,.q083D3D3K3K3MPZ3Z2c2ceJJqM ,,.dkk&7&7&9A&=AZAZ%ZT666(:K:K:R:R:T`U 	##D$=$=k$JKr   c              #     K   | | j                   j                         sU	 | j                   j                         }| j                   j                          | | j                   j                         sTy y # t        $ rA}t        j                  dj                  t        j                  |                   Y d }~bd }~ww xY ww)Nz2Some exception when fetching event from queue: {0})
r   rJ   
get_nowaitrD   r>   r   r@   r;   r   rL   )r   rN   r?   r@   s       r   r[   z/SendTelemetryEventsHandler._get_events_in_queue   s     ++##%|..0%%'	 ++##%
  |QXXYaYrYrsxYyz{{|s4   C8A7 C5C7	C 7B<7C<CCN)__name__
__module____qualname____doc__r!   rR   	timedeltar   r=   rU   rV   r   staticmethodr#   r(   r*   r&   r4   r3   r7   rA   rE   r/   rK   r[   r"   r   r   r   r   #   s    
 *L%8%%a088L-8--a8 7 7D#u  _$L|r   r   )rR   r-   rX   azurelinuxagent.commonr   azurelinuxagent.common.eventr   r    azurelinuxagent.common.exceptionr   azurelinuxagent.common.futurer   r   r	   !azurelinuxagent.common.interfacesr
   azurelinuxagent.common.utilsr   r   r   r"   r   r   <module>rk      s;   &    ) F @ < < D 15A|!7 A|r   