Queue

Job

class connector.queue.job.Job(func=None, model_name=None, args=None, kwargs=None, priority=None, eta=None, job_uuid=None, max_retries=None, description=None)[source]

Bases: object

A Job is a task to execute.

uuid[source]

Id (UUID) of the job.

worker_uuid

When the job is enqueued, UUID of the worker.

state

State of the job, can pending, enqueued, started, done or failed. The start state is pending and the final state is done.

retry

The current try, starts at 0 and each time the job is executed, it increases by 1.

max_retries

The maximum number of retries allowed before the job is considered as failed.

func_name

Name of the function (in the form module.function_name).

args

Arguments passed to the function when executed.

kwargs

Keyword arguments passed to the function when executed.

func_string[source]

Full string representing the function to be executed, ie. module.function(args, kwargs)

description[source]

Human description of the job.

func[source]

The python function itself.

model_name

OpenERP model on which the job will run.

priority

Priority of the job, 0 being the higher priority.

date_created

Date and time when the job was created.

date_enqueued

Date and time when the job was enqueued.

date_started

Date and time when the job was started.

date_done

Date and time when the job was done.

result

A description of the result (for humans).

exc_info

Exception information (traceback) when the job failed.

user_id

OpenERP user id which created the job

eta[source]

Estimated Time of Arrival of the job. It will not be executed before this date/time.

canceled

True if the job has been canceled.

cancel(msg=None)[source]
description[source]
eta[source]
func[source]
func_string[source]
perform(session)[source]

Execute the job.

The job is executed with the user which has initiated it.

Parameters: session (ConnectorSession) – session to execute the job
postpone(result=None, seconds=None)[source]

Write an estimated time arrival to n seconds later than now. Used when an retryable exception want to retry a job later.

related_action(session)[source]
set_done(result=None)[source]
set_enqueued(worker)[source]
set_failed(exc_info=None)[source]
set_pending(result=None)[source]
set_started()[source]
uuid[source]

Job ID, this is an UUID

class connector.queue.job.JobStorage[source]

Bases: object

Interface for the storage of jobs

exists(job_uuid)[source]

Returns if a job still exists in the storage.

load(job_uuid)[source]

Read the job’s data from the storage

store(job)[source]

Store a job

class connector.queue.job.OpenERPJobStorage(session)[source]

Bases: connector.queue.job.JobStorage

Store a job on OpenERP

enqueue(func, model_name=None, args=None, kwargs=None, priority=None, eta=None, max_retries=None, description=None)[source]

Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted from the ones to pass to the job function.

enqueue_resolve_args(func, *args, **kwargs)[source]

Create a Job and enqueue it in the queue. Return the job uuid.

exists(job_uuid)[source]

Returns if a job still exists in the storage.

load(job_uuid)[source]

Read a job from the Database

openerp_id(job)[source]
store(job)[source]

Store the Job

connector.queue.job.job(func)[source]

Decorator for jobs.

Add a delay attribute on the decorated function.

When delay is called, the function is transformed to a job and stored in the OpenERP queue.job model. The arguments and keyword arguments given in delay will be the arguments used by the decorated function when it is executed.

The delay() function of a job takes the following arguments:

session
Current ConnectorSession
model_name
name of the model on which the job has something to do
*args and **kargs

Arguments and keyword arguments which will be given to the called function once the job is executed. They should be pickle-able.

There is 4 special and reserved keyword arguments that you can use:

  • priority: priority of the job, the smaller is the higher priority.

    Default is 10.

  • max_retries: maximum number of retries before giving up and set

    the job state to ‘failed’. A value of 0 means infinite retries. Default is 5.

  • eta: the job can be executed only after this datetime

    (or now + timedelta if a timedelta or integer is given)

  • description : a human description of the job,

    intended to discriminate job instances (Default is the func.__doc__ or

    ‘Function %s’ % func.__name__)

Example:

@job
def export_one_thing(session, model_name, one_thing):
    # work
    # export one_thing

export_one_thing(session, 'a.model', the_thing_to_export)
# => normal and synchronous function call

export_one_thing.delay(session, 'a.model', the_thing_to_export)
# => the job will be executed as soon as possible

export_one_thing.delay(session, 'a.model', the_thing_to_export,
                       priority=30, eta=60*60*5)
# => the job will be executed with a low priority and not before a
# delay of 5 hours from now

See also: related_action() a related action can be attached to a job

connector.queue.job.related_action(action=<function <lambda> at 0x4126848>, **kwargs)[source]

Attach a Related Action to a job.

A Related Action will appear as a button on the OpenERP view. The button will execute the action, usually it will open the form view of the record related to the job.

The action must be a callable that responds to arguments:

session, job, **kwargs

Example usage:

def related_action_partner(session, job):
    model = job.args[0]
    partner_id = job.args[1]
    # eventually get the real ID if partner_id is a binding ID
    action = {
        'name': _("Partner"),
        'type': 'ir.actions.act_window',
        'res_model': model,
        'view_type': 'form',
        'view_mode': 'form',
        'res_id': partner_id,
    }
    return action

@job
@related_action(action=related_action_partner)
def export_partner(session, model_name, partner_id):
    # ...

The kwargs are transmitted to the action:

def related_action_product(session, job, extra_arg=1):
    assert extra_arg == 2
    model = job.args[0]
    product_id = job.args[1]

@job
@related_action(action=related_action_product, extra_arg=2)
def export_product(session, model_name, product_id):
    # ...

Worker

class connector.queue.worker.Worker(db_name, watcher)[source]

Bases: threading.Thread

Post and retrieve jobs from the queue, execute them

enqueue_job_uuid(job_uuid)[source]

Enqueue a job:

It will be executed by the worker as soon as possible (according to the job’s priority

job_storage_class

alias of OpenERPJobStorage

queue_class

alias of JobsQueue

run()[source]

Worker’s main loop

Check if it still exists in the watcher. When it does no longer exist, it break the loop so the thread stops properly.

Wait for jobs and execute them sequentially.

run_job(job)[source]

Execute a job

class connector.queue.worker.WorkerWatcher[source]

Bases: threading.Thread

Keep a sight on the workers and signal their aliveness.

A WorkerWatcher is shared between databases, so only 1 instance is necessary to check the aliveness of the workers for every database.

static available_db_names()[source]

Returns the databases for the server having the connector module installed.

Available means that they can be used by a Worker.

Returns: database names
Return type: list
check_alive(db_name, worker)[source]

Check if the the worker is still alive and notify its aliveness. Check if the other workers are still alive, if they are dead, remove them from the worker’s pool.

run()[source]

WorkerWatcher‘s main loop

worker_for_db(db_name)[source]
worker_lost(worker)[source]

Indicate if a worker is no longer referenced by the watcher.

Used by the worker threads to know if they have to exit.

connector.queue.worker.start_service()[source]

Start the watcher

Queue

class connector.queue.queue.JobsQueue[source]

Bases: object

Holds the jobs planned for execution in memory.

The Jobs are sorted, the higher the priority is, the earlier the jobs are dequeued.

dequeue()[source]

Take the first job according to its priority and return it

enqueue(job)[source]

Models

class connector.queue.model.QueueJob(pool, cr)[source]

Bases: openerp.osv.orm.Model

Job status and result

autovacuum(cr, uid, context=None)[source]

Delete all jobs (active or not) done since more than _removal_interval days.

Called from a cron.

button_done(cr, uid, ids, context=None)[source]

Open the related action associated to the job

requeue(cr, uid, ids, context=None)[source]
write(cr, uid, ids, vals, context=None)[source]
class connector.queue.model.QueueWorker(pool, cr)[source]

Bases: openerp.osv.orm.Model

Worker

assign_jobs(cr, uid, max_jobs=None, context=None)[source]

Assign n jobs to the worker of the current process

n is max_jobs or unlimited if max_jobs is None

Parameters: max_jobs (int) – maximal limit of jobs to assign on a worker
assign_then_enqueue(cr, uid, max_jobs=None, context=None)[source]

Assign all the jobs not already assigned to a worker. Then enqueue all the jobs having a worker but not enqueued.

Each operation is atomic.

Warning

commit transaction cr.commit() is called, so please always call this method in your own transaction, not in the main OpenERP’s transaction

Parameters: max_jobs (int) – maximal limit of jobs to assign on a worker
enqueue_jobs(cr, uid, context=None)[source]

Enqueue all the jobs assigned to the worker of the current process

worker_timeout = 300
class connector.queue.model.requeue_job(pool, cr)[source]

Bases: openerp.osv.orm.TransientModel

requeue(cr, uid, ids, context=None)[source]