pypi上有模块celery_once可解决重复提交和队列管理的问题。暂未实践。
因为业务需求,使用了celery配置eta(estimated time of arrival)的功能,传入datetime类型的值,让任务在具体的某个时间执行。
在Django根目录执行查看任务计划,看到了许多重复提交到worker的eta任务。
查看语句
celery -A <celery.py文件所在文件夹> inspect scheduled
task_id均相同,排除是因为代码逻辑的问题重复提交.
This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.
简单来说, celery对ETA/countdown/retry等要求具体时间执行的任务支持并不完整. 指定执行时间,与celery自身的失效重传机制有所冲突.
celery在没有收到任务被worker正常执行的时候就会发起重传.我项目中的ETA任务往往是在24小时之后才执行, celery的默认重传timeout是1个小时(Visibility timeout).因此理论上在ETA时间没有到之前,celery每过一个小时便重复提交一个任务给worker
Visibility timeout
If a task isn’t acknowledged within the Visibility Timeout the task will be redelivered to another worker and executed.
This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.
So you have to increase the visibility timeout to match the time of the longest ETA you’re planning to use.
Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.
Periodic tasks won’t be affected by the visibility timeout, as this is a concept separate from ETA/countdown.
You can increase this timeout by configuring a transport option with the same name:
app.conf.broker_transport_options = {‘visibility_timeout’: 43200}
The value must be an int describing the number of seconds.
在django的setting文件当中手动配置visibility_timeout时间, 值为ETA时间的最大间隔.
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 172800}
##结果
再次查看celery schedule,确实重复任务消失了. 但理论上,这种设置会让celery的重传机制失效, 可能在进程间通讯发生问题时,重传检测机制不能及时发现.
第二天查看celery计划状态, 发现重复任务依然存在,说明在完成配置之后,依然被重复提交了.
使用redis作为django的cache.
# setting.py .... CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", "LOCATION": "redis://127.0.0.1:6379/1", 'TIMEOUT': 7 * 24 * 60 * 60, "OPTIONS": { "CLIENT_CLASS": "django_redis.client.DefaultClient", } } } ....
在出现重复提交的任务中加锁.
2.1 使用唯一标识为key(如task+操作对象object_id),配合redis的原子操作SETNX(SET IF NOT EXIST)执行前判断是否在cache中存在,已存在则tasks直接返回,不执行业务逻辑.
2.2 在Django-redis中使用方法为cache.set(key, value, timeout, nx=True).
2.3 若不存在,上述操作完成key:value的写入并返回True, 说明tasks第一次执行.
大致代码如下:
# tasks.py def example_task(object_id): flag = 'example_task' + str(object_id) nx_lock = cache.set(flag, 1, 60, nx=True) #过期时间为60秒 if not nx_lock: print("task has been locked") return .... ....
这样可以使业务不受重复任务的影响,但仍然没有从根本上解决问题.有更好的解决方法希望分享, 谢谢.
上一个:猫德任务(tbc猫德需要精准吗)
下一个:vue导入Excel数据到表格