o
    /i~                     @   s,  d dl Z d dlZd dlZd dlmZmZ er"d dlmZ ddlm	Z	 d dl
mZ d dlmZ dZd	d
dedefddZdeeef deeef fddZd	d
defddZd	d
defddZd$d	d
defddZdddeeef fddZd%ddZdddeeef fd d!Zdddeeef fd"d#ZdS )&    N)TYPE_CHECKINGAny)Redis   )
BaseWorker)InvalidJobOperation)Jobzrq:pubsub:%s
connectionr   worker_namecommandc                 K   s0   d|i}|r| | | t| t| dS )a$  
    Sends a command to a worker.
    A command is just a string, available commands are:
        - `shutdown`: Shuts down a worker
        - `kill-horse`: Command for the worker to kill the current working horse
        - `stop-job`: A command for the worker to stop the currently running job

    The command string will be parsed into a dictionary and send to a PubSub Topic.
    Workers listen to the PubSub, and `handle` the specific command.

    Args:
        connection (Redis): A Redis Connection
        worker_name (str): The Job ID
    r   N)updatepublishPUBSUB_CHANNEL_TEMPLATEjsondumps)r	   r
   r   kwargspayload r   J/var/www/html/flask_server/venv/lib/python3.10/site-packages/rq/command.pysend_command   s   
r   r   returnc                 C   s   t | d  S )zd
    Returns a dict of command data

    Args:
        payload (dict): Parses the payload dict.
    data)r   loadsdecode)r   r   r   r   parse_payload&   s   r   c                 C      t | |d dS )z
    Sends a command to shutdown a worker.

    Args:
        connection (Redis): A Redis Connection
        worker_name (str): The Job ID
    shutdownNr   r	   r
   r   r   r   send_shutdown_command0      r   c                 C   r   )z
    Tell worker to kill it's horse

    Args:
        connection (Redis): A Redis Connection
        worker_name (str): The Job ID
    
kill-horseNr   r   r   r   r   send_kill_horse_command;   r    r"   job_idc                 C   s4   t j|| |d}|jstdt| |jd|d dS )z
    Instruct a worker to stop a job

    Args:
        connection (Redis): A Redis Connection
        job_id (str): The Job ID
        serializer (): The serializer
    )r	   
serializerzJob is not currently executingstop-job)r#   N)r   fetchr
   r   r   )r	   r#   r$   jobr   r   r   send_stop_job_commandF   s   	r(   workerr   c                 C   sP   |d dkrt | | dS |d dkrt|  dS |d dkr&t| | dS dS )zParses payload and routes commands to the worker.

    Args:
        worker (Worker): The worker to use
        payload (Dict[Any, Any]): The Payload
    r   r%   r   r!   N)handle_stop_job_commandhandle_shutdown_commandhandle_kill_worker_commandr)   r   r   r   r   handle_commandU   s   r.   c                 C   s&   | j d t }t|tj dS )zUPerform shutdown command.

    Args:
        worker (Worker): The worker to use.
    z1Received shutdown command, sending SIGINT signal.N)loginfoosgetpidkillsignalSIGINT)r)   pidr   r   r   r+   d   s   r+   c                 C   s:   | j d | jr| j d |   dS | j d dS )z
    Stops work horse

    Args:
        worker (Worker): The worker to stop
        payload (Dict[Any, Any]): The payload.
    zReceived kill horse command.zKilling horse...z1Worker is not working, kill horse command ignoredN)r/   r0   	horse_pid
kill_horser-   r   r   r   r,   o   s
   	r,   c                 C   sL   | d}| jd| |r|  |kr|| _|   dS | jd| dS )zHandles stop job command.

    Args:
        worker (Worker): The worker to use
        payload (Dict[Any, Any]): The payload.
    r#   zReceived command to stop job %sz'Not working on job %s, command ignored.N)getr/   debugget_current_job_id_stopped_job_idr8   r0   )r)   r   r#   r   r   r   r*      s   
r*   )N)r)   r   )r   r1   r4   typingr   r   redisr   r)   r   rq.exceptionsr   rq.jobr   r   strr   dictr   r   r"   r(   r.   r+   r,   r*   r   r   r   r   <module>   s&    "

