o
    h$                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZ er4d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ G d	d
 d
eZdd Zdd Zdd Zdd Zdd Zdd ZG dd dZG dd deZdd ZdS )    N)Integration)capture_internal_exceptionsensure_integration_enabled)TYPE_CHECKING)Any)Optional)EventHintSparkContextc                   @   s   e Zd ZdZedd ZdS )SparkIntegrationsparkc                   C   s
   t   d S N)_setup_sentry_tracing r   r   l/var/www/html/pro-man-master/venv/lib/python3.10/site-packages/sentry_sdk/integrations/spark/spark_driver.py
setup_once   s   
zSparkIntegration.setup_onceN)__name__
__module____qualname__
identifierstaticmethodr   r   r   r   r   r      s    r   c                  C   s:   ddl m}  | j}|r|d|j |d|j dS dS )z
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    r   r
   sentry_app_namesentry_application_idN)pysparkr   _active_spark_contextsetLocalPropertyappNameapplicationId)r   spark_contextr   r   r   _set_app_properties   s   r    c                 C   s4   ddl m} | j}|| t }| j | dS )zA
    Start java gateway server to add custom `SparkListener`
    r   )ensure_callback_server_startedN)pyspark.java_gatewayr!   _gatewaySentryListener_jscscaddSparkListener)r&   r!   gwlistenerr   r   r   _start_sentry_listener,   s
   r*   c                    s   t  }|j fdd}d S )Nc                    sX  t   t td u r| W  d    S  jd u r$| W  d    S | di d   | di d j	d | d d j	d | d d j	d	 | d d
 j	d | d d j
 | d d j | d d j | d d j | d d j | di d j W d    | S 1 sw   Y  | S )Nuseridtagszexecutor.idzspark.executor.idzspark-submit.deployModezspark.submit.deployModezdriver.hostzspark.driver.hostzdriver.portzspark.driver.portspark_versionapp_nameapplication_idmaster
spark_homeextraweb_url)r   
sentry_sdk
get_clientget_integrationr   r   
setdefault	sparkUser_confgetversionr   r   r1   	sparkHomeuiWebUrl)eventhintr&   r   r   process_event=   s6   


z+_add_event_processor.<locals>.process_event)r5   get_isolation_scopeadd_event_processor)r&   scoperB   r   rA   r   _add_event_processor9   s   rF   c                 C   s   t |  t  t|  d S r   )r*   r    rF   rA   r   r   r   _activate_integration]   s   rG   c                     s2   ddl m}  | j tt  fdd}|| _d S )Nr   r
   c                    s"    | g|R i |}t |  |S r   )rG   )selfargskwargsrvspark_context_initr   r   "_sentry_patched_spark_context_initk   s   zE_patch_spark_context_init.<locals>._sentry_patched_spark_context_init)r   r   _do_initr   r   )r   rN   r   rL   r   _patch_spark_context_inite   s
   
rP   c                  C   s.   ddl m}  | jd urt| j d S t  d S )Nr   r
   )r   r   r   rG   rP   r
   r   r   r   r   u   s
   


r   c                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 ZG d1d2 d2Zd3S )4SparkListenerc                 C      d S r   r   )rH   applicationEndr   r   r   onApplicationEnd      zSparkListener.onApplicationEndc                 C   rR   r   r   )rH   applicationStartr   r   r   onApplicationStart   rU   z SparkListener.onApplicationStartc                 C   rR   r   r   )rH   blockManagerAddedr   r   r   onBlockManagerAdded   rU   z!SparkListener.onBlockManagerAddedc                 C   rR   r   r   )rH   blockManagerRemovedr   r   r   onBlockManagerRemoved   rU   z#SparkListener.onBlockManagerRemovedc                 C   rR   r   r   )rH   blockUpdatedr   r   r   onBlockUpdated   rU   zSparkListener.onBlockUpdatedc                 C   rR   r   r   )rH   environmentUpdater   r   r   onEnvironmentUpdate   rU   z!SparkListener.onEnvironmentUpdatec                 C   rR   r   r   )rH   executorAddedr   r   r   onExecutorAdded   rU   zSparkListener.onExecutorAddedc                 C   rR   r   r   )rH   executorBlacklistedr   r   r   onExecutorBlacklisted   rU   z#SparkListener.onExecutorBlacklistedc                 C   rR   r   r   )rH   executorBlacklistedForStager   r   r   onExecutorBlacklistedForStage   s   z+SparkListener.onExecutorBlacklistedForStagec                 C   rR   r   r   )rH   executorMetricsUpdater   r   r   onExecutorMetricsUpdate   rU   z%SparkListener.onExecutorMetricsUpdatec                 C   rR   r   r   )rH   executorRemovedr   r   r   onExecutorRemoved   rU   zSparkListener.onExecutorRemovedc                 C   rR   r   r   )rH   jobEndr   r   r   onJobEnd   rU   zSparkListener.onJobEndc                 C   rR   r   r   )rH   jobStartr   r   r   
onJobStart   rU   zSparkListener.onJobStartc                 C   rR   r   r   )rH   nodeBlacklistedr   r   r   onNodeBlacklisted   rU   zSparkListener.onNodeBlacklistedc                 C   rR   r   r   )rH   nodeBlacklistedForStager   r   r   onNodeBlacklistedForStage   rU   z'SparkListener.onNodeBlacklistedForStagec                 C   rR   r   r   )rH   nodeUnblacklistedr   r   r   onNodeUnblacklisted   rU   z!SparkListener.onNodeUnblacklistedc                 C   rR   r   r   )rH   r?   r   r   r   onOtherEvent   rU   zSparkListener.onOtherEventc                 C   rR   r   r   )rH   speculativeTaskr   r   r   onSpeculativeTaskSubmitted   rU   z(SparkListener.onSpeculativeTaskSubmittedc                 C   rR   r   r   )rH   stageCompletedr   r   r   onStageCompleted   rU   zSparkListener.onStageCompletedc                 C   rR   r   r   )rH   stageSubmittedr   r   r   onStageSubmitted   rU   zSparkListener.onStageSubmittedc                 C   rR   r   r   )rH   taskEndr   r   r   	onTaskEnd   rU   zSparkListener.onTaskEndc                 C   rR   r   r   )rH   taskGettingResultr   r   r   onTaskGettingResult   rU   z!SparkListener.onTaskGettingResultc                 C   rR   r   r   )rH   	taskStartr   r   r   onTaskStart   rU   zSparkListener.onTaskStartc                 C   rR   r   r   )rH   unpersistRDDr   r   r   onUnpersistRDD   rU   zSparkListener.onUnpersistRDDc                   @   s   e Zd ZdgZdS )zSparkListener.Javaz1org.apache.spark.scheduler.SparkListenerInterfaceN)r   r   r   
implementsr   r   r   r   Java   s    
r   N)r   r   r   rT   rW   rY   r[   r]   r_   ra   rc   re   rg   ri   rk   rm   ro   rq   rs   rt   rv   rx   rz   r|   r~   r   r   r   r   r   r   r   rQ      s4    rQ   c                   @   s8   e Zd Z	dddZdd Zdd Zdd	 Zd
d ZdS )r$   Nc                 C   s   t  j|||d d S )Nlevelmessagedata)r5   rC   add_breadcrumb)rH   r   r   r   r   r   r   _add_breadcrumb   s   
zSentryListener._add_breadcrumbc                 C   s2   t    d| }| jd|d t  d S )NzJob {} Startedinfo)r   r   )r5   rC   clear_breadcrumbsformatjobIdr   r    )rH   rl   r   r   r   r   rm      s   
zSentryListener.onJobStartc                 C   sb   d}d}d|   i}|   dkrd}d| }n	d}d| }| j|||d d S )	N resultJobSucceededr   zJob {} EndedwarningzJob {} Failedr   )	jobResulttoStringr   r   r   )rH   rj   r   r   r   r   r   r   rk      s   zSentryListener.onJobEndc                 C   sT   |  }d| }d| i}t|}|d ur||d< | jd||d t  d S )NzStage {} Submittedname	attemptIdr   r   )	stageInfor   stageIdr   _get_attempt_idr   r    )rH   ry   
stage_infor   r   
attempt_idr   r   r   rz   	  s   
zSentryListener.onStageSubmittedc                 C   s   ddl m} | }d}d}d| i}t|}|d ur ||d< z|  |d< d| }d}W n |yE   d	| }d
}Y nw | j	|||d d S )Nr   )Py4JJavaErrorr   r   r   reasonzStage {} Failedr   zStage {} Completedr   r   )
py4j.protocolr   r   r   r   failureReasonr;   r   r   r   )rH   rw   r   r   r   r   r   r   r   r   r   rx     s"   zSentryListener.onStageCompletedr   )r   r   r   r   rm   rk   rz   rx   r   r   r   r   r$      s    
r$   c                 C   s>   z|   W S  ty   Y nw z|  W S  ty   Y d S w r   )r   	ExceptionattemptNumber)r   r   r   r   r   /  s   

r   )r5   sentry_sdk.integrationsr   sentry_sdk.utilsr   r   typingr   r   r   sentry_sdk._typesr   r	   r   r   r   r    r*   rF   rG   rP   r   rQ   r$   r   r   r   r   r   <module>   s&    	$
gI