How to test a REST API that saves data outside of the ZODB in a RDBMS

In a project I have some Plone REST API endpoints that get some inputs and register some information in an annotation.

We have an extensive set of tests, both testing the utility that writes the annotation and also testing the REST API endpoints, its results with different inputs, and checks that everything is being written correctly in the annotation.

So far so good.

Now we are evaluating to save the information in a RDBMS because of some limitations and ConflictErrors that we are having in production.

As we have a utility to do the annotation writing things, I have replaced the implementation quite easily using SQLAlchemy and writing the data to PostgreSQL without much issues.

But when I run the tests, I find a lot of issues because the REST API doesn't find the data in the database.

To run those tests, instead of using PostgreSQL, I am using SQLite, but initially I was running them with PostgreSQL and I was having the same issues.

For the test layer setup, I am using a WSGI_SERVER_FIXTURE much like it is done in plone.restapi to test the endpoints.

For instance, to test the endpoint that deletes some data from the database, I populate first the database, I issue the relevant API request, and when executing the request I find the database empty, even uninitialized.

It looks like I can't populate correctly the data from the same test session that I use to query it, because they live in another session.

For the database setup I am using z3c.saconfig which internally is using zope.sqlalchemy which has zope.transation integration.

I have been looking at GitHub - euphorie/Euphorie: Euphorie is a tool for risk assessment which has an extensive set of tests, and also is using sqlalchemy to do those tests, but in that case it is using Browser tests and not REST API tests, and I don't know whether it can be any difference there.

I will keep investigating, but any hint will be appreciated. I will try to modify how I populate the database in the tests.

2 Likes

Over the years, I heavily mutilated example.trajectory from its original and ended up with the following in my db.py module to be able to reuse a database session. My issue was that a new session would attempt to remap an already mapped SQLAlchemy Table object.

FWIW maybe you can use below as inspiration...


class ErpEdiemSession(object):
    """Reusing a single instance of the database session client
    The session itself is not persisted, according to
    https://github.com/zopefoundation/zope.sqlalchemy/tree/1.6
    """

    # Enforce UTC as our base timezone. See UTCDateTime class below.
    os.putenv("PGTZ", "UTC")
    _instance = None
    _mapped_classes = {}

    def __new__(cls):
        """This code brought to you by ChatGPT
        using a singleton pattern
        """
        if cls._instance is None:
            cls._instance = super(ErpEdiemSession, cls).__new__(cls)
            initializeSession()  # Initialize the session
            # log.info('Initializing erpediem session object')
        else:
            # log.info('Not initializing session again')
            pass

        return cls._instance

    def __call__(self, dbconnection):
        """Always returns the scoped session
        <sqlalchemy.orm.scoping.scoped_session object at 0x7f9a708d5950>
        so that we can use self.session.get_bind() in the view
        """
        global _PROFILE_ENGINE
        global _PROFILE_SESSION
        global DEBUG

        """
        if _PROFILE_SESSION:
            log.info('Calling erpediem session object %s %s %s' % (self, dbconnection, _PROFILE_SESSION, ))
            log.info(self._mapped_classes)
        else:
            log.info('no profile session yet')
        """

        portal = api.portal.get()
        saconnect = ISQLAlchemyConnectionStrings(portal)
        self.dsn = saconnect[dbconnection]

        # log.info('erpediem profile engine %s' % _PROFILE_ENGINE)
        # log.info('erpediem profile session %s' % _PROFILE_SESSION)

        if not (_PROFILE_ENGINE and _PROFILE_SESSION):
            '''
            eleddy: since we moved this code out of the actual init, so that we can
            have a sane configuration setup, we MUST do this with a lock. Otherwise sql
            alchemy loses its shit and goes on a 'fuck your multi-threading - I'll eat
            pancakes on your grave!' tirade. Then you spend your friday
            night sober and staring at an abyss of configuration headaches and
            infinite loops usually reserved for Mondays.
            '''
            with _DB_CONFIG_LOCK:
                if not _PROFILE_ENGINE:
                    _create_database_if_missing(self.dsn)
                    _PROFILE_ENGINE = create_engine(
                        self.dsn,
                        pool_size=5,
                        pool_recycle=3600,
                        convert_unicode=True,
                        echo=DEBUG,
                    )
                    # reflect existing tables
                    Base.metadata.reflect(_PROFILE_ENGINE)

                    # create new tables
                    Base.metadata.create_all(_PROFILE_ENGINE)

                    """
                    for _t in Base.metadata.tables:
                        columns =  Table(_t, Base.metadata).columns
                        log.info("Table: %s" % columns)
                    """

                if not _PROFILE_SESSION:
                    _PROFILE_SESSION = scoped_session(sessionmaker(bind=_PROFILE_ENGINE))

        return _PROFILE_SESSION

    def map_class(self, class_to_map, table, primary_key):
        """This code brought to you by ChatGPT
        using a registry of mapped classes
        """
        # log.info('Mapped classes: %s' % self._mapped_classes)
        if class_to_map not in self._mapped_classes.keys():
            # log.info('Attempting to map: %s' % class_to_map)
            mapper(class_to_map, table, primary_key=primary_key)
            self._mapped_classes[class_to_map] = True
        else:
            # log.info('Not mapping: %s again' % class_to_map)
            pass
1 Like

The API request uses its own transaction which opens a new connection to the database. You should use a functional test layer and commit the transaction in between populating the database and making the API request.

Then you'll probably need some way to reset the test database before starting a new test run...

2 Likes

Thank you for your inputs @mtrebron I will try the pointed db module.

@davisagli I am already using a FunctionalTesting layer, using a WSGI_SERVER_FIXTURE as a base (much like in plone.restapi). Thanks for pointing it!

I use a custom container with MSSQL Database for some tests. It's prefilled at buildtime. The queries are happen via pyodbc. In the tests (Functionaltest) i do something like this with a transaction commit:

def test_stationslist_no_param_search(self):
  import json

  api_session = RelativeSession(self.portal.absolute_url(), test=self)
  api_session.headers.update({"Accept": "application/json"})
  api_session.auth = (TEST_USER_NAME, TEST_USER_PASSWORD)
 
  # internal it's a sql query to mssql db
  response = api_session.get(
     "/@contactsearch",
       params={"cat": "stations"},
       proxies=self._proxies,
  )
  transaction.commit()
  self.assertEqual(response.status_code, 500)
  error = json.loads(response.text)
  self.assertEqual(error["type"], "QueryParsingError")

1 Like