Strolling through garden of delights

I got tasked with writing some new tests in our test framework. It utilizes Locust, a framework written in Python, that helps to make it easier to program HTTP test suites. I had a whirlwind of an experience of getting the simple feature in which was quite fun.

PoC

I started out with learning Locust by writing a Proof of Concept (PoC) that will just simply do one request to an endpoint I needed to test with a simple User class. Locust has a main entry file needed to run the tests and then consists of specific tasks defined within User classes. My PoC was as follows: locustfile.py:

from worker import HubUser
from multiprocessing.shared_memory import SharedMemory
import logging
import uuid
import pickle
from locust import events

@events.init.add_listener
def on_locust_init(environment, **kwargs):
    if "no-shared-device" in environment.parsed_options.tags:
        return
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(uuid.uuid4().hex)
    pickled = pickle.dumps(logger)

    environment.shm = SharedMemory("logger", True, len(pickled))
    for i,b in enumerate(pickled):
        environment.shm.buf[i] = b

@events.quitting.add_listener
def on_locust_quitting(environment, **kwargs):
    environment.shm.close()
    environment.shm.unlink()

worker.py

from locust.user import HttpUser
from locust.runners import MasterRunner
from locust import task, tag, events

import logging
import uuid
from multiprocessing.shared_memory import SharedMemory
from multiprocessing import Lock
import pickle

logging.basicConfig(level=logging.INFO)

@events.init.add_listener
def on_locust_init(environment, **kwargs):
    if isinstance(environment.runner, MasterRunner):
       return
    if "no-shared-device" in environment.parsed_options.tags:
        return
    HubUser.shm = SharedMemory("logger", create=False)
    HubUser.logger = pickle.loads(self.shm.buf)


class HubUser(HttpUser):
    shm = None
    logger = logging.getLogger(__name__)

    def on_start(self):
        self.logger.info(self)
        self.lock = Lock()
        self.print = True

    def on_stop(self):
        if self.shm:
            self.shm.close()

    @tag("hub:baseline:registry")
    @task
    def registry_baseline(self):
        resp = self.client.get("/v2")
        self.lock.acquire()
        if self.print:
            self.logger.info(resp.content)
            self.print=False
        self.lock.release()

    @tag("hub:baseline:token")
    @task
    def registry_token_baseline(self):
        """Get a simple request to just token_auth, without actual creds"""
        resp = self.client.get("/token-auth")
        self.lock.acquire()
        if self.print:
            self.logger.info(resp.content)
            self.print=False
        self.lock.release()

In essence I was seeing how the distributed code worked and how the users that got spawned were able to share the same data. I used a SharedMemory class to try it out and it worked great. They all shared the same logger, but the lock within the class instance was meaningless as each thread got it's own instance in my version of Locust which was 2.8.4. It was testing a simple request to Docker Registry HTTP API.

Dependency Hell

When the code got into the test framework, I could not build the Docker image at first. It complained about some dependency being incompatible. I massaged that away by pinning a different version for either Flask and/or Jinja2. Then during the execution of the Docker image it would not start because of a similar dependency problem. It was located in either Jinja2 or Werkzeug gotten from Flask which is a part of Locust. I just updated the version of the test framework, 1.5.3, to my own Locust of the PoC and then it all worked.

Killers on the loose

When getting to many users running over the Worker nodes the containers keep getting killed due to Out of Memory (OOM) problems. I located the problem to a singular method in the bootup path of the Worker nodes for each node and/or user. It was located within the on_start method that gets called by the Locust framework when starting the tests.

So it did the following: 8Mb for the first use and 2Mb on subsequent execution. That meant it was allocating 8 + 2 * 40 = 88Mb for example for one particular test. The framework took some memory as well and then I saw another particular execution path that took another 40 * 2 Mb. The container itself was limited to 200Mb max of which 168Mb is now gobbled up by two methods. Now Python VM itself also needs some memory and I saw it usually takes around 30Mb or so and during profiling I saw a barebones Locust startup with our code framework used in total 50Mb. So we are short some 20Mb sadly.

Singleton

I thought what if I make the a part of the code into a singleton, as I only need one instance of this particular class and also only one invocation of this particular class. So that will limit it to 8Mb increase one time. It still left the other 40*2Mb invocation but I will deal with that if need be. I had many versions of this particular code. First just a Singleton class along the following lines:

class UserSingleton:
   __instance = None

  @staticmethod
  def instance(environment):
       if UserSingleton.__instance is not None:
             return UserSingleton.__instance
       return UserSingleton(environment)

  def __init__(self, environment):
       # Do stuff with environment, initialize the class and the code used to be in on_start
       UserSingleton.__instance = self

The singleton got created and put within a event hook that comes with Locust called on init. Perfect.

Profiling

When I kept profiling and running this code, it kept on keeping the memory in the process until the end. I quickly read up that Python keeps the memory in the VM all the way until the end of the process before releasing it back to the OS. That gave me the next idea.

Parallel Process

Make a Process from multiprocessing module and with that run the code in a separate process to make it so that the memory generated by it will not impact the main process. The bump of course is that it will start another Python VM plus the code you run. So it has to execute once and only once and we should be all good. I also utilized Pipe from multiprocessing module to communicate back to the main process. All of this worked perfectly.

def docker_bootstrap(environment, conn):
    SpecificUser.host = environment.host
    specific_user = SpecificUser(environment=environment)
    specific_user.on_start()
    device_hub_tasks = GatewayDockerHubTasks(specific_user)
    docker_auth = device_hub_tasks.hub_creds().json()
    environment.docker_auth_username = Value(c_wchar_p, docker_auth['Username'], lock=False)
    environment.docker_auth_password = Value(c_wchar_p, docker_auth['Secret'], lock=False)
    HubUser.host = environment.host
    hub_user = HubUser(environment=environment)
    hub_user.docker_auth = docker_auth
    conn.send(
        (
            docker_auth,
            HubAuthTasks(hub_user).get_device_token().json()["token"],
        )
    )

@locust.events.init.add_listener
def init(environment, **kwargs):
    if isinstance(environment.runner, (locust.runners.LocalRunner, locust.runners.WorkerRunner)):
            if not hasattr(environment, "initialized"):
                parent_conn, child_conn = Pipe()
                p = Process(target=docker_bootstrap, args=(environment, child_conn))
                p.start()
                p.join()
                (
                    environment.docker_auth,
                    docker_token,
                ) = parent_conn.recv()
                environment.docker_auth_username = Value(c_wchar_p, environment.docker_auth['Username'])
                environment.docker_auth_password = Value(c_wchar_p, environment.docker_auth['Secret'])
                environment.docker_token = Value(c_wchar_p, docker_token)
                environment.initialized = True

It still got OOM killed. The test run resets the stats after a bit of time because otherwise the startup time warps/skews the results. On stat reset the code gets executed again by all Users (in our case about 40). Which meant the code that was supposed to be only called once, now gets called 40 times again. Regardless of the singleton the particular method got called again and again. It would also trigger another error on the restart of the container. It could not initiate a particular Netshaper to help put some extra behaviour on the sockets.

Synchronization

Okay, so next up I need to synchronize the particular init event hook listener to make sure that on the reset stats it will only execute exactly once. I used a non existent property to keep track of the initialization and then have a simple Lock from the multiprocessing library to keep synchronize the execution. Perfect. It worked like a charm. Sort of. What ended up happening still was that I setup some dictionary and strings on the environment object. All 40 Greenlets are trying to access that. Which means it is blocking for all the code paths.

Low level parallel access

I ended up using Value from multiprocessing and the c_wchar_p from ctypes with the lock property of Value set to False. This made it so I could have one string that could be shared in parallel to each of the threads and processes and then I would set the actual value of the Value object to the instance of the classes so they had their own copy of the string.


lock = Lock()

@locust.events.init.add_listener
def init(environment, **kwargs):
    if isinstance(environment.runner, (locust.runners.LocalRunner, locust.runners.WorkerRunner)):
       with lock:
            if not hasattr(environment, "initialized"):
                parent_conn, child_conn = Pipe()
                p = Process(target=docker_bootstrap, args=(environment, child_conn))
                p.start()
                p.join()
                (
                    environment.docker_auth,
                    docker_token,
                ) = parent_conn.recv()
                environment.docker_auth_username = Value(c_wchar_p, environment.docker_auth['Username'])
                environment.docker_auth_password = Value(c_wchar_p, environment.docker_auth['Secret'])
                environment.docker_token = Value(c_wchar_p, docker_token)
                environment.initialized = True

# Other code
   def on_start(self):
        if not self.image_tag:
            raise RuntimeError("A IMAGE_TAG env variable is mandatory")
        if not self.docker_manifest_list_digest:
            raise RuntimeError("A MANIFEST_LIST_DIGEST env variable is mandatory")
        if self.server_ip:
            NamedHTTPAdapter.install(self.client, self.server_ip)
        if ENABLE_REDIRECTS:
            self.allow_redirects = True
        self.docker_token = self.environment.docker_token.value
        self.docker_auth_username = self.environment.docker_auth_username.value
        self.docker_auth_password = self.environment.docker_auth_password.value

Success

Success and not only that it worked quite a bit faster than before. So I had finished up this process. It still irked me that the others never gotten into these problems before of the on_start method being hit so many times on reset stats and so on.

Missed a commit

Turns out I missed a commit where the memory of the containers was upgraded a bit when they were deployed into our k8s cluster. Also my team was, rightfully so it seemed, not really keen on upgrading Locust because it might break too much stuff. Turns out they were right. I ended up rolling the version back and pinning the versions of Flask, Jinja2 and Werkzeug that were in an older Docker image explicitly and see if it would work.

I rolled back and it all worked again.

Breaking stuff

I actually broke a lot. The helm deployments did not work anymore, the reset stats thing happened due to the upgrade to newer Locust and so it was a good thing I rolled back.

Simplifying again

On a whim, I decided to not have the singleton code again and see what would happen. Nothing happened is what happened. Well it just ran. So it actually meant the reset stats “bug” or feature I am not sure, was in the newer version. D'oh.

So I could greatly simplify the code again. Then I dropped more and more things from my code so it went from being quite simple to quite complex to dead simple over the span of one very long PR.

Request per second miscalculation

Another thing that was bothering me was the low requests per second which caused me to try and optimise the code in order to speed up the code.

I also miscalculated the requests per second. Or misinterpreted, or maybe both. I was surprised I was only hitting like 20-30 requests per second when using 1000 simulated users. I was expecting 1000/s. Like every user will at least during a second send a request. This was not what that stat represented. It represented the work of sending actual requests and completing them, so that meant our service just could not handle the traffic being sent it's way and therefore I was seeing so low numbers. I completely forgot to take that into account. D'oh again.

Conclusion

I learned a great deal about multiprocessing in Python and fixed a somewhat complicated bug that seemed to require just this type of fix when dealing with 40 simultaneous executing code that needs to be only executed once. It dealt with memory profiling, understanding code paths, events, parallel computing, concurrency and some low level Python objects. Also it made me realize how I can quickly divert from a path when not just talking about it with someone else. Fully remote working takes some time to get used to I guess. Still I keep this knowledge with me.