celerytasks.py 3.01 KB
Newer Older
1
from celery.utils.log import get_task_logger
2 3
from flask.ext.celery import Celery
from datetime import datetime, timedelta
4
import time
5
from app import app, db
6
from models import Agency, Prediction
7
from nextbus import Nextbus
Anton Sarukhanov's avatar
Anton Sarukhanov committed
8
from lock import LockException
9

10
"""
11
Celery is a task queue for background task processing. We're using it
12 13
for scheduled tasks, which are configured in this file.

14
The task execution schedule can be found/tweaked in config.py.
15 16 17 18
"""


# Create new Celery object with configured broker; get other cfg params
19
celery = Celery(app)
20
celery.conf.update(app.config)
21

22
# This wraps task execution in an app context (for db session, etc)
23 24 25 26 27 28 29 30
TaskBase = celery.Task
class ContextTask(TaskBase):
    abstract = True
    def __call__(self, *args, **kwargs):
        with app.app_context():
            return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask

31 32
logger = get_task_logger(__name__)

33
# Task definitions:
34
@celery.task()
35
def update_agencies():
36 37 38 39
    """
    Refresh our list of Agencies from NextBus
    """
    Nextbus.get_agencies(truncate=True)
40 41

@celery.task()
42
def update_routes(agencies=None):
43 44 45
    """
    Refresh our list of Routes, Stops, and Directions from Nextbus
    """
46 47
    if not agencies:
        agencies = app.config['AGENCIES']
48
    for agency_tag in agencies:
Anton Sarukhanov's avatar
Anton Sarukhanov committed
49
        routes = Nextbus.get_routes(agency_tag, truncate=True)
50
    print("update_routes: Got {0} routes for {1} agencies"\
Anton Sarukhanov's avatar
Anton Sarukhanov committed
51
          .format(len(routes), len(agencies)))
52 53 54

@celery.task()
def update_predictions(agencies=None):
55 56 57 58
    """
    Get the latest vehicle arrival predictions from Nextbus
    """
    start = time.time()
59 60
    if not agencies:
        agencies = app.config['AGENCIES']
Anton Sarukhanov's avatar
Anton Sarukhanov committed
61 62
    predictions = Nextbus.get_predictions(agencies,
                                          truncate=False)
63
    elapsed = time.time() - start
Anton Sarukhanov's avatar
Anton Sarukhanov committed
64 65
    print("Got {0} predictions for {1} agencies in {2:0.2f} seconds."\
          .format(len(predictions), len(agencies), elapsed))
66 67

@celery.task()
68
def update_vehicle_locations(agencies=None):
69 70 71
    """
    Get the latest vehicle locations (coords/speed/heading) from NextBus
    """
72 73 74
    start = time.time()
    if not agencies:
        agencies = app.config['AGENCIES']
Anton Sarukhanov's avatar
Anton Sarukhanov committed
75 76 77 78 79 80 81
    try:
        vl = Nextbus.get_vehicle_locations(agencies,
                                           truncate=False,
                                           max_wait=0)
    except LockException as e:
        print(e)
        return
82
    elapsed = time.time() - start
83
    print("Got {0} vehicle locations for {1} agencies in {2:0.2f} seconds."\
Anton Sarukhanov's avatar
Anton Sarukhanov committed
84
          .format(len(vl), len(agencies), elapsed))
85

86 87 88 89 90 91 92 93

@celery.task()
def delete_stale_predictions():
    """
    Delete predictions older than PREDICTIONS_MAX_AGE.
    """
    delete = Nextbus.delete_stale_predictions()
    print("{0} stale predictions deleted".format(delete))
94 95 96 97 98 99 100 101

@celery.task()
def delete_stale_vehicle_locations():
    """
    Delete vehicle locations older than LOCATIONS_MAX_AGE.
    """
    delete = Nextbus.delete_stale_vehicle_locations()
    print("{0} stale vehicle locations deleted".format(delete))