distributed application in Python

Greenspun’s tenth rule or How I was building a distributed application in Python! (And some words of Erlang)

Introduction

Recently I was tasked with building a small PoC of a platform for manipulating various types of IoT devices. Those may vary from being simple light bulbs to complex cooking stations. Moreover, an essential part of it is keeping overall responsiveness, minimal latency and a huge number of concurrent users on the platform. In the article, I will explain the problems that I came across and show the solutions that might be optimal and time-tested but not standard for many programming languages. The reference code won’t be describing all the details, instead of focusing on the most significant parts.

I hope you will enjoy the read!

1) Design decisions

After initial thoughts of using monolithic architecture considering its advantage of shorter development time, I decided to stick with a microservice architectural style due to the flexibility it provides.

The following microservice components were designed:

  • Frontend Service — a microservice that provides a UI for a regular user to manipulate the business entities on the platform in a familiar way.
  • Authentication Service — a component that provides a way to authenticate and authorize a user.
  • Device Management Service — a microservice that serves as an adapter between frontend WebSockets API and specific devices types microservices (Light bulb microservice, Sous-Vide cooking station microservice, etc.)
  • Specialized Device Service — a set of microservices that connects different devices types with their physical representations from the one side and with the aggregator in the form of the Device management service from the other side.
  • Statistics Service — a microservice that gathers statistical information from devices.

2) Technological stack

Without going into too much detail the following technologies were used:

  • Angular 7 — used for Frontend development;
  • Python 3.7 (Django) — used for Authentication Service;
  • Python 3.7 (PyPy + Uvicorn + FastAPI + asyncio) — used for the Device Management Service to solve 40k connection problem while keeping a rather simple Flask-based backend interface;
  • Clojure 1.10 — used for a Sous-Vide microservice (a subtype of a general Specialized Device Service used in PoC) to keep a good response time while keeping good Multithreading and Asynchronous capabilities;
  • Python 3.7 (PyPy + Uvicorn + FastAPI + asyncio) — used for Statistics Service;
  • Redis — used as a message broker to establish two-way communication between Device Management Service and Specialized Device Service types;
  • Mosquitto — MQTT-broker for delivering messages to and from physical devices to their Specialized Device Service counterparts;

C11 — embedded programming on physical devices (ESP8266 based computer).

3) What might have gone wrong?

Well, if the article has any mentions of Greenspun’s tenth rule then you know that something went not as smooth as expected. For those who are not familiar with the heuristic rule, I place it here. Greenspun’s tenth rule of programming is an aphorism in computer programming and especially programming language circles that states: Any sufficiently complicated C or Fortran program contains an ad-hoc, informally-specified, bug-ridden, slow implementation of half of Common Lisp.

Speaking simple, this means that if you are trying to implement a complicated program in C, chances are, you are rewriting parts of Common Lisp’s standard features.We are going to be focusing on the addition to this rule namely about concurrency that states:Any sufficiently complicated concurrent program in another language contains an ad-hoc, informally-specified, bug-ridden, slow implementation of half of Erlang.

4) Implementation details

As the architecture was completed and the development process started, I found myself in a need to design the Device Management Service in Python to be highly available, concurrent and performant to keep up with a load of at least 20.000 potential users.

5) Attaching device list to Websocket request problem

Python (even though we are using PyPy) has a well-known restriction around using multithreading for CPU-bound task called GIL. Since our project won’t be doing any number crunching or heavy processing, we are left with solving IO-bound tasks like keeping multiple concurrent socket connections. This is easily solved via asyncio module, so we are good with that.

As having multiple workers also increases the performance by an order of magnitude, we want to have that too.

Taking into account these decisions, this is the code that was produced:

class Device(BaseModel):
    id: int
    description: str
    status: str
    connected: bool
    targetTemp: Optional[float] = None
    timestamp: Optional[int] = None
    time: Optional[int] = None

connections: List[Tuple[int, WebSocket, List[Device]]] = []

@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    token = websocket.query_params["token"]
    payload = jwt.decode(token, SECRET, algorithms=[ALGORITHM])
    user_id = payload["user_id"]
    user_devices.append(Device(id=dv[0], description=dv[1], status=dv[2], user=dv[3], connected=dv[4]))
    connections.append((user_id, websocket, user_devices))

As you can see, this piece of code is used for getting the Websocket request, querying all the devices that belong to the user and saving them in the connection set.

The problem: Multiple workers don’t share the same connection set

Proposed solution: Introduce a way to synchronize workers via Redis Pub/Sub

@staticmethod
def find_connection(user_id: int, device_id: int) -> Optional[WebSocket]:
    for (user, connection, user_devices) in connections:
        if user == user_id:
            for dv in user_devices:
                if device_id == dv.id:
                    return connection
    return None

This function check if a connection set for the current process contains a specified user with the connected device by its id.

6) Realtime device update problems

Suppose we have the HTTP endpoints for creating new devices and updating existing ones that look like the following ones:

@app.patch("/devices/{item_id}")
async def patch_device(*, request: Request, item_id: int, change_params: ChangeParams, background_tasks: BackgroundTasks):
    query = devices.update().where(devices.c.user == request.user.display_name and devices.c.id == item_id).values(
        time=change_params.time, timestamp=change_params.current_time, targetTemp=change_params.targetTemp)
    await database.execute(query)


@app.post("/devices/", response_model=Device, status_code=201)
async def create_device(request: Request, device: DeviceRequest):
    query = devices.insert().values(description=device.description, status="off",
                                    user=request.user.display_name, connected=False)
    record_id = await database.execute(query)
    dv: Device = Device(id=record_id, description=device.description, status="off",
                        user=request.user.display_name, connected=False)

The problem: A worker holding current user doesn’t get notified if a change to its user’s set of devices has been made

Proposed solution: Introduce special forms of messages that get broadcasted to all the workers via Redis to find the one holding user’s device list and change it

def add_device(user_id: int, device: Device) -> bool:
    for (user, connection, user_devices) in connections:
        if user == user_id:
            for dv in user_devices:
                if device.id == dv.id:
                    return False
            user_devices.append(device)
            return True
    return False


def edit_device(user_id: int, device_id: int, **kwargs):
    for (user, connection, user_devices) in connections:
        if user == user_id:
            for i in range(len(user_devices)):
                if device_id == user_devices[i].id:
                    edited_device: Device = user_devices[i].copy(update=kwargs)
                    print(edited_device, flush=True)
                    user_devices[i] = edited_device
                    
async def perform_update(self):
    if self.msg_type == "set_temp":
        if self.extra["temp"] == 0:
            print(f"Setting device {self.device_id} to off")
            query = devices.update().where(devices.c.user == self.user and devices.c.id == self.device_id).values(
                status="off", time=None, targetTemp=None, timestamp=None)
            await database.execute(query)
            pub.publish_json('connections',
                             {"user": self.user, "device": self.device_id, "device_delta": {"status": "off"}})
                             
async def reader(mpsc: Receiver):
    async for channel, msg in mpsc.iter():
        assert isinstance(channel, AbcChannel)

        if channel.name == b"connections":
            try:
                msg = msg.decode("utf-8").replace("'", '"')
                redis_msg = json.loads(msg)

                user = redis_msg["user"]

                if redis_msg.get("device_delta", None) is not None:
                    device_delta = redis_msg["device_delta"]
                    device_id = redis_msg["device"]
                    edit_device(user, device_id, **device_delta)
                    continue

                device = redis_msg["device"]
                device = json.loads(device)
                dv: Device = Device(**device)
                add_device(user, dv)
            except Exception as e:
                print(e, flush=True)

We have time to introduce multiple helper-synchronization functions to make simple updating across different processes possible.

7) Process monitoring problem

We cannot claim a program to be a reliable one without having some form of monitoring interface that would report errors and maintain the system in a stable state even if they occur.

The problem: A worker might encounter an exception or fail at a random point

Proposed solution: Introduce a supervisor interface like supervisord.

8) Accidental vs Essential complexity problem

It is undeniable that software development problems are very complex, not even touching the problem of parallel computing or concurrent systems. However, how much of that complexity is essential to the program itself? What are the essentials of the complexity and how to measure it? You can read about it in a detailed blog. In our example, one might suggest that all the sync patterns that we used are essential to the design of the program in Python. That’s mostly considering that Python was the only option… But it wasn’t.

9) Erlang/Elixir as an alternative

What do we know about Erlang and how can it help us?

Erlang is a general-purpose concurrent, functional programming language, and a garbage-collected runtime system.

The Erlang runtime system is known for its design decisions that are well suited for systems with these traits:

  • Distributed;
  • Fault-tolerant;
  • Soft real-time;
  • Highly available, non-stop;
  • Hot-swapping, where code can be changed without stopping a system.

If you are familiar with the terms, you might have already guessed that the mentioned above traits are just what our application needs.

N.B. I’d also need to mention that since Erlang is more than 30 years old the syntax is a little bit weird for the Python programmers, so we’ll take a look at examples in Elixir. It is a rather new language that was created to solve some of the Erlang’s long-running problems while keeping the power of the BEAM runtime and maintaining syntax that is more friendly to new-comers.

def hello do
    IO.puts "hello world"
end

Starting simple, this is an example of ‘Hello World’ application in Elixir. We immediately see the lack of parentheses. In most functional languages, we don’t have to put parentheses to show the function invocation. You can mentally think of this as

def hello() do
  IO.puts("hello world")
end

Next example shows the simplicity and power of Elixir/Erlang’s runtime system:

def show_spawn do
    spawn fn -> 1 + 2 end
end

A function `spawn` with a closure passed to it creates a green thread (sort of more powerful async/await) that computes `1 + 2` in parallel.

All the green threads in Erlang/Elixir operate using an internal id, which identifies them amongst other green threads. Let’s see how this could be handy

def async_hello do
    send self(), {:hello, "world"}
    receive do
      {:hello, msg} -> msg
      {:whateva, msg} -> "won't match"
    end
  end

Here we see a function using self()sendreceive that we are not familiar with at the moment. Let’s solve this mistake:

  • self() is a function that returns a green thread its id;
  • send has two parameters — receiver and message. It sends a message to the receiver;
  • receive uses pattern matching to make actions depending on the type of payload.

In our case, we will be sending a message {:hello, “world”} to ourself and immediately trying to receive it.

By now you might have understood that having an id of the receiver is quite important. Let’s see the final example:

def async_parent_child do
    parent = self()
    spawn fn -> send parent, {:hello, self()} end
    IO.puts "My pid: " <> (inspect parent)
    receive do
      {:hello, pid} -> "Got hello from #{inspect pid}"
    end
  end

Here we combine the syntax constructs learned from the first and the second examples. We get the id of the current parent process via parent = self() and spawn a child process tasked with send parent a message with its id send parent, {:hello, self()}. After that, the only thing left is printing the message receive do {:hello, pid} -> “Got hello from #{inspect pid}”.

10) Erlang/Elixir usage advantages

If we take a closer look at the problems mentioned in the Python section of the article, we’ll see that Erlang/Elixir solves those problems via the means of a suitable runtime and a well-formed standard library. The presented approach is also known as Actor model. It isn’t the only solution (See Communicating sequential processes for an alternative) but it is a good one. So, in a nutshell, we were implementing the things that are already present in Erlang/Elixir as a part of their batteries in Python. May I remind you of an addition to the Greenspun’s tenth rule:

Any sufficiently complicated concurrent program in another language contains an ad-hoc informally-specified bug-ridden slow implementation of half of Erlang

Conclusion

While I am not saying that Python solution for the presented problem is bad. If your company is full of expert Python engineers, it might be even the best solution. I am trying to tell that there are many different approaches out there in the wild that might be ideally fitting your project’s needs that you might not be even aware of. A programming language is just a tool after all, and a good engineer should know his tools well enough

“If I had nine hours to chop down a tree, I’d spend the first six sharpening my axe.”
— Abraham Lincoln (16th President of the United States)

“A good scientist is a person with original ideas. A good engineer is a person who makes a design that works with as few original ideas as possible. There are no prima donnas in engineering”
— Freeman Dyson (theoretical physicist and mathematician)

  • #Distributed Systems
  • #Erlang
  • #Python
  • #Quantum
  • #Software Development
  • Connect with our experts