63 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			63 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
# from __future__ import absolute_import, unicode_literals
 | 
						|
from celery import Task
 | 
						|
from celery import shared_task
 | 
						|
import logging
 | 
						|
from django.conf import settings
 | 
						|
from server.settings import get_sysconfig
 | 
						|
import importlib
 | 
						|
from django.core.cache import cache
 | 
						|
import requests
 | 
						|
 | 
						|
# 实例化myLogger
 | 
						|
myLogger = logging.getLogger('log')
 | 
						|
 | 
						|
@shared_task
 | 
						|
def send_mail_task(**args):
 | 
						|
    config = get_sysconfig()
 | 
						|
    subject = args.get('subject', '500')
 | 
						|
    args['subject'] = '{}:{}_{}_{}'.format(
 | 
						|
        settings.SYS_NAME, settings.SYS_VERSION, config['base']['base_name_short'], subject)
 | 
						|
    args['from_email'] = args.get('from_email', settings.EMAIL_HOST_USER)
 | 
						|
    args['recipient_list'] = args.get(
 | 
						|
        'recipient_list',  settings.EMAIL_DEVELOPERS if hasattr(settings, 'EMAIL_DEVELOPERS') else [settings.EMAIL_HOST_USER])
 | 
						|
    cache_key = f'error_mail_{subject}'
 | 
						|
    email_tuple = cache.get(cache_key)
 | 
						|
    if email_tuple is None:
 | 
						|
        email_tuple = (0, True)
 | 
						|
        cache.set(cache_key, email_tuple, 60)
 | 
						|
    email_count, email_enable = email_tuple
 | 
						|
    if email_enable:
 | 
						|
        email_count += 1
 | 
						|
        if email_count > 4:
 | 
						|
            email_enable = False
 | 
						|
            # 如果频率高于每分钟4封,则自动屏蔽半小时
 | 
						|
            cache.set(cache_key, (email_count, email_enable), 1800)
 | 
						|
            args['subject'] = args['subject'] + '_发送频繁'
 | 
						|
        else:
 | 
						|
            cache.set(cache_key, (email_count, True), 60)
 | 
						|
        if getattr(settings, 'JUMP_MAIL_ENABLED', False):
 | 
						|
            requests.post(settings.JUMP_MAIL_URL, json=args)
 | 
						|
        else:
 | 
						|
            from django.core.mail import send_mail
 | 
						|
            send_mail(**args)
 | 
						|
 | 
						|
class CustomTask(Task):
 | 
						|
    """
 | 
						|
    自定义的任务回调
 | 
						|
    """
 | 
						|
 | 
						|
    def on_failure(self, exc, task_id, args, kwargs, einfo):
 | 
						|
        detail ='Task {0} raised exception: {1!r}\n{2!r}'.format(
 | 
						|
                    task_id, exc, einfo.traceback)
 | 
						|
        myLogger.error(detail)
 | 
						|
        send_mail_task.delay(subject=f'task_error_{self.name}', message=detail)
 | 
						|
        return super().on_failure(exc, task_id, args, kwargs, einfo)
 | 
						|
 | 
						|
@shared_task(base=CustomTask)
 | 
						|
def ctask_run(func_str: str, *args, **kwargs):
 | 
						|
    """通用celery函数/将普通函数转为celery执行/也可直接运行
 | 
						|
    """
 | 
						|
    module, func = func_str.rsplit(".", 1)
 | 
						|
    m = importlib.import_module(module)
 | 
						|
    f = getattr(m, func)
 | 
						|
    f(*args, **kwargs) |