diff --git a/apps/utils/models.py b/apps/utils/models.py index b68031b6..1e715a9b 100755 --- a/apps/utils/models.py +++ b/apps/utils/models.py @@ -8,6 +8,7 @@ from django.db import IntegrityError from django.db import transaction from rest_framework.exceptions import ParseError from django.core.cache import cache +from django.db import transaction, connection import hashlib # 自定义软删除查询基类 @@ -115,25 +116,39 @@ class BaseModel(models.Model): @classmethod def safe_get_or_create(cls, defaults=None, **kwargs): + """ + 多进程/多服务器安全的 get_or_create + - 数据库唯一约束不够时,用 Redis 锁防止重复创建 + - 在事务中使用 select_for_update + """ defaults = defaults or {} - - for i in range(3): - try: - obj = cls.objects.create(**kwargs, **defaults) - return obj, True - except IntegrityError: - try: - # 有别的进程抢先创建了,直接 get - obj = cls.objects.get(**kwargs) - return obj, False - except cls.DoesNotExist: - # 极端并发情况下 create 和 get 都失败,稍微等一下重试 - time.sleep(0.001 * (i + 1)) # 微等一下避免竞争 - continue + create_kwargs = {**kwargs, **defaults} - # 最终重试失败(极罕见) - obj = cls.objects.get(**kwargs) - return obj, False + for attempt in range(3): + try: + if connection.in_atomic_block: + # 在事务中,先锁定再获取 + try: + obj = cls.objects.select_for_update().get(**kwargs) + return obj, False + except cls.DoesNotExist: + obj = cls(**create_kwargs) + obj.save() + return obj, True + else: + # 非事务,使用分布式锁 + sorted_kwargs = dict(sorted(create_kwargs.items())) + lock_hash = hashlib.md5(str(sorted_kwargs).encode()).hexdigest() + lock_key = f"safe_get_or_create:{cls.__name__}:{lock_hash}" + + with cache.lock(lock_key, timeout=10): + return cls.objects.get_or_create(**kwargs, defaults=defaults) + + except IntegrityError: + # 唯一约束冲突,重试 + if attempt == 2: + raise + time.sleep(0.1 * (attempt + 1)) def handle_parent(self):