from __future__ import absolute_import, unicode_literals from celery import shared_task import subprocess from server.settings import DATABASES, BACKUP_PATH, SH_PATH, SD_PWD import logging myLogger = logging.getLogger('log') @shared_task def backup_database(): """ 备份数据库 """ import datetime name = datetime.datetime.now().strftime("%Y%m%d%H%M%S") command = 'echo "{}" | sudo -S pg_dump "user={} password={} dbname={}" > {}/bak_{}.sql'.format( SD_PWD, DATABASES["default"]["USER"], DATABASES["default"]["PASSWORD"], DATABASES["default"]["NAME"], BACKUP_PATH + "/database", name ) completed = subprocess.run(command, shell=True, capture_output=True, text=True) if completed.returncode != 0: return completed.stderr @shared_task def reload_server_git(): command = "bash {}/git_server.sh".format(SH_PATH) myLogger.info(f"reload_server_git: {command}") completed = subprocess.run(command, shell=True, capture_output=True, text=True) if completed.returncode != 0: return completed.stderr @shared_task def reload_web_git(): command = "bash {}/git_web.sh".format(SH_PATH) completed = subprocess.run(command, shell=True, capture_output=True, text=True) if completed.returncode != 0: return completed.stderr @shared_task def reload_server_only(): command = 'echo "{}" | sudo -S supervisorctl reload'.format(SD_PWD) completed = subprocess.run(command, shell=True, capture_output=True, text=True) return completed @shared_task def backup_media(): command = "bash {}/backup_media.sh".format(SH_PATH) completed = subprocess.run(command, shell=True, capture_output=True, text=True) if completed.returncode != 0: return completed.stderr @shared_task def test_task(): import time x = 1 while x < 300: print(f"test_{x}") time.sleep(1) x += 1