From bc273f17cd8e08c6944660b8633ba41763f292b2 Mon Sep 17 00:00:00 2001 From: Filipp Lepalaan Date: Sun, 6 Jun 2021 14:33:41 +0300 Subject: Migrate Celery to RQ --- servo/management/commands/scheduler.py | 28 ++++++++++++++++++++++++++++ servo/tasks.py | 24 +++++++++++------------- 2 files changed, 39 insertions(+), 13 deletions(-) create mode 100644 servo/management/commands/scheduler.py (limited to 'servo') diff --git a/servo/management/commands/scheduler.py b/servo/management/commands/scheduler.py new file mode 100644 index 0000000..18018e5 --- /dev/null +++ b/servo/management/commands/scheduler.py @@ -0,0 +1,28 @@ +from datetime import datetime + +import django_rq +from django.core.management.base import BaseCommand +from django_rq.management.commands import rqscheduler + + +from servo.tasks import check_mail + + +scheduler = django_rq.get_scheduler('default') + + +class Command(rqscheduler.Command): + + help = "Start task scheduler" + + def handle(self, *args, **kwargs): + for job in scheduler.get_jobs(): + job.delete() + + scheduler.schedule( + scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone + func=check_mail, # Function to be queued + interval=300, # Time before the function is called again, in seconds + ) + + super(Command, self).handle(*args, **kwargs) diff --git a/servo/tasks.py b/servo/tasks.py index bc5bf77..1d55136 100644 --- a/servo/tasks.py +++ b/servo/tasks.py @@ -1,10 +1,7 @@ # -*- coding: utf-8 -*- - -from __future__ import absolute_import - -from email.parser import Parser - -from celery import shared_task +import email +import logging +from email.parser import BytesParser from django.conf import settings from django.core.cache import cache @@ -32,7 +29,6 @@ def get_rules(): return rules -@shared_task def apply_rules(event): """ Applies configured rules @@ -100,7 +96,6 @@ def apply_rules(event): return '%d/%d rules processed' % (counter, len(rules)) -@shared_task def batch_process(user, data): """ /orders/batch @@ -161,24 +156,26 @@ def batch_process(user, data): return '%d/%d orders processed' % (processed, len(orders)) -@shared_task def check_mail(): - """Checks IMAP box for incoming mail""" + """ + Checks IMAP box for incoming mail + """ uid = Configuration.conf('imap_act') if empty(uid): - raise ConfigurationError('Incoming message user not configured') + err = 'User account for incoming messages not configured' + raise ConfigurationError(err) counter = 0 user = User.objects.get(pk=uid) server = Configuration.get_imap_server() + typ, data = server.search(None, "UnSeen") for num in data[0].split(): - #logging.debug("** Processing message %s" % num) typ, data = server.fetch(num, "(RFC822)") # parsestr() seems to return an email.message? - msg = Parser().parsestr(data[0][1]) + msg = BytesParser().parsebytes(data[0][1]) Note.from_email(msg, user) #server.copy(num, 'servo') server.store(num, '+FLAGS', '\\Seen') @@ -188,3 +185,4 @@ def check_mail(): server.logout() return '%d messages processed' % counter + -- cgit v1.2.3