from __future__ import absolute_import, unicode_literals from celery import shared_task import subprocess from server.settings import DATABASES, BACKUP_PATH, SH_PATH, SD_PWD @shared_task def backup_database(): """ 备份数据库 """ import datetime name = datetime.datetime.now().strftime('%Y%m%d%H%M%S') command = 'echo "{}" | sudo -S pg_dump "user={} password={} dbname={}" > {}/bak_{}.sql'.format( SD_PWD, DATABASES['default']['USER'], DATABASES['default']['PASSWORD'], DATABASES['default']['NAME'], BACKUP_PATH + '/database', name) completed = subprocess.run(command, shell=True, capture_output=True, text=True) return completed @shared_task def reload_server_git(): command = 'bash {}/git_server.sh'.format(SH_PATH) completed = subprocess.run(command, shell=True, capture_output=True, text=True) if completed.returncode == 0: return else: return completed.stdout @shared_task def reload_web_git(): command = 'bash {}/git_web.sh'.format(SH_PATH) subprocess.run(command, shell=True, capture_output=True, text=True) # return completed @shared_task def reload_server_only(): command = 'echo "{}" | sudo supervisorctl reload'.format(SD_PWD) completed = subprocess.run(command, shell=True, capture_output=True, text=True) return completed @shared_task def backup_media(): command = 'bash {}/backup_media.sh'.format(SH_PATH) completed = subprocess.run(command, shell=True, capture_output=True, text=True) return completed