How to run tasks aynchronously

Okay, I think I have a Utility now. It looks like this.

./utilities/scheduler.py

from zope.interface import implementer

from dm.zodb.asynchronous.scheduler import _PersistentSchedule
from dm.zodb.asynchronous.zope2 import PersistentTransactionalScheduler

from satzweiss.tools.asynchronous.interfaces import IScheduler


class PersistentInfoSchedule(_PersistentSchedule):
    def setInfo(self, info):
        self._info = info
    
    def getInfo(self):
        return getattr(self, '_info', None)
    
    def getId(self):
        return self.id

class AtomicItemsDict(dict):
    """ We derive from dict and reimplement dict.items for thread safety reasons.
    """
    def items(self):
        while True:
            try:
                return list(super().items())
            except RuntimeError:
                pass

@implementer(IScheduler)
class InfoScheduler(PersistentTransactionalScheduler):
    SCHEDULE_FACTORY = PersistentInfoSchedule
    MAPPING_FACTORY = AtomicItemsDict

    def schedule(self, f, *args, **kw):
        info = kw.pop('info', None)
        sid = super().schedule(f, *args, **kw)
        sch = self._schedules[sid]
        sch.setInfo(info)

        return sid
    
    def getSchedules(self):
        result = {}
        for key, sch in self._schedules.items():
            r, exc = sch.get_result()
            result[key] = dict(
                completed=key in self._completed,
                result=r,
                exception=exc
            )
        return result

./utilities/configure.zcml:

  <utility
    provides="satzweiss.tools.asynchronous.interfaces.IScheduler"
    factory=".scheduler.InfoScheduler"
  />

./interfaces.py

from zope.interface import Interface

class IScheduler(Interface):
    pass

And two helper functions:

import plone.api as api

from dm.zodb.asynchronous.zope2 import transactional, PersistentContext
from zope.component.hooks import getSite, setSite
from Testing.makerequest import makerequest
from zope.globalrequest import setRequest
from zope.component import getUtility

#from satzweiss.tools.asynchronous.utilities.scheduler import getScheduler
from satzweiss.tools.asynchronous.interfaces import IScheduler


@transactional
def async_func(persistent, siteId, userId, uid, func, args, kwargs):

    app = makerequest(persistent['app'])

    setSite(app[siteId])
    setRequest(app.REQUEST)

    with api.env.adopt_user(userId):
        if uid:
            context = api.content.get(UID=uid)
            func = getattr(context, func, None)
        return func(*args, **kwargs)

def async_call(info, f, *args, **kwargs):
    context = getattr(f, '__self__', None)
    if context:
        uid = context.UID()
        func = f.__name__
    else:
        uid = None
        func = f
    
    scheduler = getUtility(IScheduler)

    return scheduler.schedule(
        async_func,
        PersistentContext(app=getSite().__parent__),
        info=info,
        siteId=getSite().id,
        userId=api.user.get_current().id,
        uid=uid,
        func=func,
        args=args,
        kwargs=kwargs
    )

I can now call it like this. If the function is a bounded method the context will be passed using its UID and dereferenced later. If it is a normal function if will be called statically.

sid = async_call("Let's do something", self.uks, rpc=False, fndFilter=[100178002], rmSwitch=True)

However after a commit I get this:

Traceback (innermost last):
  Module ZPublisher.WSGIPublisher, line 162, in transaction_pubevents
  Module ZPublisher.WSGIPublisher, line 359, in publish_module
  Module ZPublisher.WSGIPublisher, line 254, in publish
  Module ZPublisher.mapply, line 85, in mapply
  Module ZPublisher.WSGIPublisher, line 63, in call_object
  Module ldkiid.base.content.stichtag, line 274, in foo
  Module satzweiss.tools.asynchronous.decorator, line 38, in async_call
  Module satzweiss.tools.asynchronous.utilities.scheduler, line 37, in schedule
  Module dm.zodb.asynchronous.scheduler, line 180, in schedule
  Module dm.zodb.asynchronous.context, line 104, in __init__
  Module dm.zodb.asynchronous.context, line 124, in identify
ValueError: obj <satzweiss.tools.asynchronous.utilities.scheduler.InfoScheduler object at 0x7f2d85b92f20> not part of the graph of persistent objects

The following code is from dm.zope.saml2.authority.move_handler.

  from zope.component.hooks import getSite
  site = getSite()
  sm = site.getSiteManager()
  sm.registerUtility(o, provided=ISamlAuthority)

This code registers o as utility providing the interface ISamlAuthority. In your case, you must register a PersistentTransactionalScheduler instance. Likely, provided is not necessary.

Ensure that you are in a portal context (otherwise, getSite may give you an inadequate site). You can use setSite to set up a portal context.

As you can see above I already managed to create an utility. I used the documentation from here: Utilities — Plone Documentation v5.2
Could the error come from inheriting _PersistentSchedule and PersistentTransactionalScheduler into my own classes?

The error message tells you that the ...InfoScheduler object is not stored in the ZODB (its _p_jar or p_oid is None). I do not know why.

Potentially, there is a problem when the scheduler is created in the same transaction as the call to schedule: this should not be the case but in my use cases, I created the scheduler manually in its own transaction; thus, creation is the same transaction may not have been tested.

I made a utility out of it.

Create ./profiles/default/componentregistry.xml

<?xml version="1.0"?>
<componentregistry>
	<utilities>
		<utility
			name="sw_asynctool"
			factory="satzweiss.tools.asynchronous.utilites.scheduler.InfoScheduler"
			interface="satzweiss.tools.asynchronous.interfaces.IScheduler"
		/>
	</utilities>
</componentregistry>

Then I reinstalled my addon and use getUtility() like so:

scheduler = getUtility(IScheduler, name='sw_asynctool')

I then can call scheduler.schedule(). But it is still the same error

  Module dm.zodb.asynchronous.scheduler, line 180, in schedule
  Module dm.zodb.asynchronous.context, line 104, in __init__
  Module dm.zodb.asynchronous.context, line 124, in identify
ValueError: obj <satzweiss.tools.asynchronous.utilities.scheduler.PersistentInfoSchedule object at 0x7fac33a722e0> not part of the graph of persistent objects

It happens here:

  def schedule(self, f, *args, **kw):
    schedule = self.SCHEDULE_FACTORY((f, args, kw))
    self._schedules[schedule.id] = schedule
    transaction.get().addAfterCommitHook(
      self._start,
      (self.PERSISTENT_CONTEXT_FACTORY(self, schedule),)
      )
    # This happens automatically
    # transaction.get().addAfterAbortHook(self._remove, (True, schedule.id,))
    return schedule.id

At the point were self.PERSISTENT_CONTEXT_FACTORY is called there are these values:

self = <satzweiss.tools.asynchronous.utilities.scheduler.InfoScheduler object at 0x7f76d8447350 oid 0x89be3 in <Connection at 7f76ddb2c550>>
schedule = <satzweiss.tools.asynchronous.utilities.scheduler._PersistentInfoSchedule object at 0x7f76d894a2e0>

As you can see the freshly created schedule from self.SCHEDULE_FACTORY does not have a connection although it inherits persistent.Persistent. And because of that it fails.

The utility itself works fine now. I can get it and use the same object from different requests and clients. Only creating a schedule creates the exception.

Well, ignore what I just said. It was my fault. I overrode the MAPPING_FACTORY with a normal dict again instead of using the OOBTree. No wonder it didn't work as expected.

At least I learned something from it. That is cool. :smiley:

1 Like