How to run tasks aynchronously

Hi,
I am trying to develop a mechanism which should be able to retrieve some tasks and run them in the background. Usually these tasks are something like calling a view in a given context or calling a certain method bound to a context. Before executing a transaction should be started and in the end there should be a commit.

Before
In the last project I developed a few tables within an Oracle database because it also was our Relstorage at the same time. Then from withing Plone jobs were defined and then added to the database, which had its own transactions. Then in the background there was a cronjob that constantly called a dedicated client using curl/wget which then retrieved the jobs from the table and executed them in the correct context and as the original user. Using a bit of jQuery and Ajax I was able to create a viewlet which showed the jobs in the queue.

Some ideas
Then a few weeks ago I created something similar but runs in a separate thread in the same client process. It looks like this:

import requests
import threading
import queue
requestQueue = queue.Queue()

def requestWorker():
    while True:
        request = requestQueue.get()
        request['sessionMethod'](*request['args'], **request['kwargs'])
        requestQueue.task_done()

def addRequest(sessionMethod, *args, **kwargs):
    requestQueue.put(dict(
        sessionMethod=sessionMethod,
        args=args,
        kwargs=kwargs
    ))

threading.Thread(target = requestWorker, daemon=True).start()

And could be called like this:

session = requests.Session():
    addRequest(session.post, 'http://plone.org', data = {'some': 'data'}, timeout = 15)

Now I am able to add background requests to arbitrary URLs with some POST data without interrupting a usual Plone view request. If the requests takes a long time , it is going to be no problem. The user gets its response directly. But of course he does not know if it was successful. But this time this was not the desired target. The requests were allowed to fail.

What I really need
So, now I need something like I described in the first chapter but I do not want to use cronjobs, bash scripts, external curl/wget processes, dedicated database tables and dedicated clients. I need something like an other thread running in parallel with the client itself but it's only job is to get a request and run it asynchronously. I want to add the current environment (user, permissions, etc), the original request (or a modified one) and context the requestQueue and the background thread should be able to run it accordingly. I know that a zope2instance can have multiple worker threads, so my question is now how I can create one of these workers dynamically from within my addon and push requests to it.

Btw. it looks my topic is quite similar to this one: Asynchronous tasks with Plone 5.x and WSGI

1 Like

Maybe, dm.zodb.asynchronous could be helpful. Its primary purpose is to support asynchronous activity initiated from a request, but it has more general asynchronous infrastructure such as e.g. a transactional decorator to turn a function into a transactional function.

Thank you @dieter,
at first glance this sounds just like what I need. Can I also create a view around it which shows what the current queue looks like?
I will now read more about it in detail.

The central entity to support "long running" requests is the TransactionalScheduler. It schedules an asynchronous task (run in a separate thread) and returns a task id. Followup requests can use this id to check whether the task is finished and in this case to access the result.

From this description, you can see that there is no queue -- each task is executed in its own thread. But, of course, you could enhance the TransactionsnalScheduler to present information over the currently run tasks.

The package works at the ZODB level (as indicated by its name). It has no direct support for Zope's "view" concept. But you can use ZPublisher.BaseRequest.BaseRequest.traverse to set up a (mostly standard) web request context in your transactional function. With such a request, views work as usual.

In my case I need to fetch some data from a SQL database and then create a bunch of Plone objects, some custom content types and also some File-objects. If the user requests less than 4 objects I usually create them synchronously. If the user requests more I add these tasks to the job queue and create them in the background because he/she can then proceed with other stuff and can got back to previous context if all background jobs were completed and all objects created.

So that's my use case.

Once you have set up the correct context, you can (in principle) do all you want. If the files content comes from the request, you may need to copy them into files you control yourself.

@dieter Somehow it seems not possible to get the portal object out of thin air. I am getting this error:

    site   = api.portal.get()
  File "/home/ngoeddel/plone/buildout-cache/eggs/plone.api-1.10.2-py3.8.egg/plone/api/portal.py", line 81, in get
    raise CannotGetPortalError(
plone.api.exc.CannotGetPortalError: Unable to get the portal object. More info on http://docs.plone.org/develop/plone.api/docs/api/exceptions.html#plone.api.exc.CannotGetPortalError

Is there an other way to get it if the only thing I have is an arbitrary content (e.g. a Folder/Item) object wrapped in PersistentContext()?

Okay, I tried to get the site and set it again inside the asynchronous method:

import transaction
from dm.zodb.asynchronous.zope2 import transactional, PersistentContext
from dm.zodb.asynchronous.scheduler import TransactionalScheduler
from zope.component.hooks import getSite, setSite

class MyObject(Container):

    def doSomething(self):
        """ """
        s = TransactionalScheduler()

        sid = s.schedule(MyObject.async_updateAllVariablesFromDB, PersistentContext(self, getSite()))
        
        transaction.commit()
        
        return sid

    @staticmethod
    @transactional
    def async_updateAllVariablesFromDB(context):
        print("async_updateAllVariablesFromDB")
        setSite(context[1])
        return context[0].updateAllVariablesFromDB()

    def updateAllVariablesFromDB(self):
        site   = getSite() #api.portal.get()

Note that the updateAllVariablesFromDB method is much more complicated and there are other classes involved too. The real call to getSite() is somewhere else.

After that I was able to use getSite() again, but plone.api.portal.get() results in this exception:

    site   = api.portal.get()
  File "/home/ngoeddel/plone/buildout-cache/eggs/plone.api-1.10.2-py3.8.egg/plone/api/portal.py", line 77, in get
    for potential_portal in closest_site.aq_chain:
  File "/home/ngoeddel/plone/buildout-cache/eggs/Products.CMFCore-2.4.8-py3.8.egg/Products/CMFCore/Skinnable.py", line 83, in __getattr__
    raise AttributeError(name)
AttributeError: aq_chain

However I am just experiencing another issue now. It seems it is not possible to use Products.CMFCore.utils._checkPermission(). I guess this happens because there is no current request object? Can I simulate that? Or do I have to check all possible permissions beforehand and let the asynchronous job do its thing without any permission checking?

Make the TransactionScheduler a global object (e.g. a global utility). Otherwise, it may not live long enough.

This happens because there is no "security manager".

A security manager is usually set up during authentication at the end of traversal, implemented by ZPublisher.BaseRequest.BaseRequest.traverse. It can be set up manually via AccessControl.SecurityManagement.newSecurityManager (note that newSecurityManager has a strange signature - its first parameter is request, not user; if you do not have a request object, you can pass None).

Note that in order to use views, you need a (new) request (the old one is outside of your control and will be closed automatically at the end of request processing). If you are able to provide the necessary information for authentication, it is likely best to set up the context via BaseRequest.traverse (rather than to set up a security manager manually). This way, you get quite a standard (web request) context. Note that in order to use request.traverse, your request must be set up properly. You can use Testing.makerequest.makerequest for the elementary preparation (you may need additional preparation to let authentication succeed). makerequest needs an "application object" (i.e. the Zope root object). You can pass it either via a Context object (--> dm.zodb.asynchronous) or create a new one (look into ZPublisher to find out one method to do this).

collective.taskqueue or plone.app.async (although the later won't get updated to python 3 most likely)

These days I tend to pass the work to a Celery worker and use restapi for communication of the worker and the Plone. I experimented in GitHub - collective/collective.elastic.plone: Plone part of ElasticSearch index and GitHub - collective/collective.elastic.ingest: Plone Elastic Search Integration: Ingestion service queue runner between Plone RestAPI and ElasticSearch
I am very happy with the result here and can imagine this could work very well in a generic way too.

I am not convinced of the concept of opening extra threads, since resource planning in a more complex setup is difficult and other performance related side effects are dropping in as well.

Having (an) independent Celery worker(s) opens new possibilities for integration with third party services as well, w/o the overhead of a whole Plone instance. For the Celery worker, Plone and it's Restapi is just another service here.

Additional, in a containerized world (Docker/k8s) the worker runs in its own space independent of Plone.

1 Like

Hi @dieter. Thank you for your help so far. I have another question regarding PersistentTransactionalScheduler. I definitely want to store the schedules in the database so that load balancing is possible and different Plone instances can see the scheduled tasks of other instances. But I am not sure how to use it. The docstring says I need to define PERSISTENT_CONTEXT_FACTORY in the derived class. How has that factory to look like or do you have a small example? I can learn best by example.

The docstring tells you that it must be a class derived from context.PersistentContext. This is an abstract class because the ZODB does not specify how the root database can be obtained. In zope2, you find a concrete class which is adequate for a Zope application.

As an example, you can look at the test class TestPersistentTransactionalScheduler in tests.test_zope2 (if a package has tests, it is always a good idea to look there for examples).

Hi @dieter,
I just got this error:

Exception ignored in thread started by: <bound method TransactionalScheduler._run_thread of <dm.zodb.asynchronous.scheduler.TransactionalScheduler object at 0x7f16a2358bb0>>
Traceback (most recent call last):
  File "/home/ngoeddel/plone/buildout-cache/eggs/dm.zodb.asynchronous-2.2-py3.8.egg/dm/zodb/asynchronous/scheduler.py", line 149, in _run_thread
    self._complete(schedule.id)
  File "/home/ngoeddel/plone/buildout-cache/eggs/dm.zodb.asynchronous-2.2-py3.8.egg/dm/zodb/asynchronous/scheduler.py", line 119, in _complete
    for cid, t in completed.items():
RuntimeError: dictionary changed size during iteration

Is that something that could be solved with threading.Lock?

It is more a Python 3 incompatibility: with Python 2, items returns a list (atomically), with Python 3 an iterator. You could try:

def items_as_list(d):
  """return *d*'s items as a list."""
  while True:
     try: return list(d.items())
     except RuntimeError: pass

Then replace completed.items() by items_as_list(completed)

In an upcoming release (2.3), I will work around this incompatibility by defining MAPPING_FACTORY (in TransactionalScheduler) as AtomicItemsDict (rather than dict). For Python 2, this is just dict; for Python 3, it derives from dict but overrides items to return a safely iterable list. No changes in TransactionalScheduler._complete.

Maybe, you can implement this in your local copy and report back about your experiences.

This seems to work for now.

But I got a different issue now:

  Module dm.zodb.asynchronous.scheduler, line 180, in schedule
  Module dm.zodb.asynchronous.context, line 104, in __init__
  Module dm.zodb.asynchronous.context, line 124, in identify
ValueError: obj <dm.zodb.asynchronous.scheduler._PersistentSchedule object at 0x7f25db25c040> not part of the graph of persistent objects

This happens because the _PersistentSchedule instance seems to have no connection, I guess?

objs = (<satzweiss.tools.asynchronous.decorator.InfoScheduler object at 0x7f25db8077b0 oid 0x8998b in <Connection at 7f25e43f7220>>, <dm.zodb.asynchronous.scheduler._PersistentSchedule object at 0x7f25db25c040>)

This is my InfoScheduler:

from dm.zodb.asynchronous.scheduler import _PersistentSchedule
from dm.zodb.asynchronous.zope2 import PersistentTransactionalScheduler

class AtomicItemsDict(dict):
    """ We derive from dict and reimplement dict.items for thread safety reasons.
    """
    def items(self):
        while True:
            try:
                return list(super().items())
            except RuntimeError:
                pass

class InfoScheduler(PersistentTransactionalScheduler):
    SCHEDULE_FACTORY = _PersistentSchedule
    MAPPING_FACTORY = AtomicItemsDict

I do not know enough about using raw Persistent objects to understand the underlying problem. Do you need more information or can you guess what happened here?

Small edit:
My idea in the end was also to inherit from _PersistentSchedule and add an info field to it which I can then use as SCHEDULE_FACTORY.

class PersistentInfoSchedule(_PersistentSchedule):
    def setInfo(self, info):
        self._info = info
    
    def getInfo(self):
        return getattr(self, '_info', None)
    
    def getId(self):
        return self.id

A PersistentTransactionalScheduler must be stored in the ZODB. Unlike a (normal) TransactionalScheduler it cannot be a simple (global) object.

The storage in the ZODB ensures that the scheduler (and its managed schedules) is accessible by several processes (via ZEO) and controls their concurrent access.

I recommend the use of a TransactionalScheduler for use by a single process,
a PersistentTransactionalScheduler for use by several processes (when other means do not guarantee that follow up requests always arrive at the same process). When I use a PersistentTransactionalScheduler, i register it as a local persistent utility.

Be aware that a Plone bug causes problems if a "site" (an object with a local component registry) is above a Plone portal. A Plone portal is always a "site" (where local persistent utilities (among others) can be registered).

I store it in the ZODB. I first tried this:

from plone import api
plone.api.portal.get().aq_parent['my_scheduler'] = PersistentTransactionalScheduler()

But after a transaction.commit() there is this error:

*** AttributeError: 'PersistentTransactionalScheduler' object has no attribute 'manage_fixupOwnershipAfterAdd'

I also tried to store it directly under the current site but I got the same error.

Now I did this:

SCHEDULER_NAME = "sw_scheduler"

def getScheduler():
    root = api.portal.get()._p_jar.root()
    scheduler = root.get(SCHEDULER_NAME, None)
    if not isinstance(scheduler, PersistentTransactionalScheduler):
        root[SCHEDULER_NAME] = PersistentTransactionalScheduler()
    
    return root[SCHEDULER_NAME]

This works but it does not look like the best idea I ever had. :smiley:

How else can I register it as a persistent utility then? At the moment this is still a bit of voodoo to me.

Plone Foundation Code of Conduct