Problem with streaming of real-time data

Hi Bokeh-Community,

I have a question regarding streaming of data in bokeh with python. It’s complicated so I will first just write down some important notes:

  • I want to display real-time sensor data with python and bokeh.
  • There will be a maximum of 8 new points per second.
  • Every time I add a new point I want to adapt the start and the end of the x_range of the figure.
  • It would be nice if I could commit every new point (x- and y-value) in a single step (self._bokeh_datasource.stream(new_data) → new_data should be just one point in the graph).
  • I want to illustrate the data in a line graph.
  • The graph should be able to illustrate up to 50000 points.

I have already realised a working solution but there are some problems. I have started a bokeh server which runs in the background (with the implementation of the figure and line). In another program I have pulled the session of the server and streamed new data into the ColumnDataSource of the bokeh server application. After streaming new data into the ColumnDataSource I also adjust the x_range (start and end). Now there are two problems:

  1. If I add one new point per second my method works fine. It shows always the newest data from the sensor. The only problem is that this realisation needs a lot of RAM. If I add 6000 points it uses something like 200 MB of the RAM. I don’t think that the application really needs so much memory for just 6000 Points. Do you know where the problem is? This application has to run on a single-board-computer so it shouldn’t need that much memory.
  2. How should I realise the livelog with up to 8 points per second? It would be very nice if I could add every single point within this second. One thing to think of is that I also want to adjust the x_range of the figure. This means that there will be 3 changes for the webserver per new point.

Here is an example application. First start bokeh_plot.py with

bokeh serve --show bokeh_plot.py --allow-websocket-origin=localhost --allow-websocket-origin=localhost:5006

# bokeh_plot.py:
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, Range1d

datasource = ColumnDataSource(data=dict(x_achse=[   ], y_achse=[  ]))

plot = figure(plot_height=288, plot_width=577, tools="xpan", toolbar_location=None, x_range=Range1d(bounds=(0, None)))
plot.line(x='x_axis', y='y_axis', source=datasource, line_color=None, line_width=2, name="sam_line")

curdoc().add_root(plot)

After that you can start bokeh_program.py

# bokeh_program.py:
from bokeh.client import pull_session
import time
import random

bokeh_session = pull_session(url="http://localhost:5006/bokeh_plot", session_id="foo")
bokeh_session_document = bokeh_session.document
bokeh_graph = bokeh_session_document.get_model_by_name("sam_line")
bokeh_graph_datasource = bokeh_graph.data_source

bokeh_plot_figure = bokeh_session_document.roots[0]

x_value = 0

while True is True:
    y_value = random.random()

    new_data = {
        'x_axis': [x_value],
        'y_axis': [y_value],
    }

    bokeh_graph_datasource.stream(new_data)

    if x_value < 100:
        bokeh_plot_figure.x_range.start = 0
    else:
        bokeh_plot_figure.x_range.start = x_value - 100

    bokeh_plot_figure.x_range.end = x_value

    x_value += 1
    time.sleep(0.03)

(The sleep time is low so that it starts to lag faster)

You can see the plot with a webbrowser on http://localhost:5006/bokeh_plot?bokeh-session-id=foo

This example simulates my application. The used RAM of bokeh_program.py increases and the application starts to lag after some minutes.

Summary:
Question 1: Why is the used RAM increasing so fast?
Question 2: Is this the right way to implement my use case?

Thanks for your help in advance.

With best regards
Manuel

Hi @mabo The first thing I will quickly note is that kind of usage, where you call stream from outside the bokeh server, from a separate process, is not really intended usage.[1] In fact, I am somewhat surprised it works in any fashion. In any case, connecting this way has a number of intrinsic disadvantages, including doubling both the storage required for any data (since its now duplicated in the outside process, and the bokeh server), as well as adding an entire additional network leg between the data and the browser (between the outside process and the bokeh server). This kind of approach is explicitly dis-encouraged by the developers of Bokeh.

So, I would first suggest trying to reorganized things so that all the logic is in the bokeh server app, perhaps using a periodic callback in place of the loop.

Alternatively, you might look at AjaxDataSource or ServerSentDataSource. Given that you are not actually relying on any real Python callbacks, using one of those would allow you to avoid running a Bokeh server at all.


  1. The intended usage of bokeh.client is to make one time tweaks/changes to a session. e.g. to customize a Bokeh app that is embedded in a Flask app on a per-user basis, before the user sees it. ↩︎

1 Like

Thanks for your help. I suspected that this isn’t a correct approach although it fits perfectly in my software. I will reorganize my logic and hope that this will solve my problems.

Hi,

i’ve worked on my logic and could solve the problem with the high RAM usage. The RAM usage is now low and not a problem anymore.

Now my problem is that the cpu usage of my bokeh application grows constantly. Here is my code:

import time
import sys

from bokeh.layouts import column
from bokeh.models.widgets import CheckboxGroup, CheckboxButtonGroup
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, Range1d
from random import randint
import bokeh.model

# Praeprozessor
global ist_sam
ist_sam = False

# Auflistung aller globalen Variablen
global datenquelle, datenquelle_analyse
global plot, line_livelog, line_analyse
global checkbox_group

# Debugvariablen
global x_achse_zaehler
x_achse_zaehler = 0

def callback_datenquelle(attr, old, new):
    x_achse = datenquelle.data["x_achse"][-1]

    if 0 in checkbox_group.active:
        if x_achse < 100:
            plot.x_range.start = 0
        else:
            plot.x_range.start = x_achse - 100
        plot.x_range.end = x_achse

    plot.x_range.bounds = (0, x_achse + 1)

def get_random_values():
    global x_achse_zaehler
    x_achse_zaehler += 1

    y_achse_random_value = randint(0, 100)

    return str(x_achse_zaehler) + ";" + str(y_achse_random_value) + "\n"

def callback_endlosschleife():
    global datenquelle

    try:
        if ist_sam is True:
            # SAM-Methode
            input_from_PIPE = sys.stdin.readline()
            input_from_PIPE = str(input_from_PIPE)
        else:
            # Debug-Methode
            input_from_PIPE = get_random_values()

        if ";" in input_from_PIPE:
            x_achse, y_achse = input_from_PIPE.split(";")

            y_achse.replace("\n", "")

            new_data = {
                'x_achse': [int(x_achse)],
                'y_achse': [int(y_achse)],
            }

            datenquelle.stream(new_data)
            # datenquelle.stream(new_data, rollover=500)

    except Exception as e:
        pass

def erstelle_graph():
    initialisiere_datenquelle()
    initialisiere_plot()

def initialisiere_datenquelle():
    global datenquelle, datenquelle_analyse

    datenquelle = ColumnDataSource(data=dict(x_achse=[0], y_achse=[0]))
    datenquelle.on_change('data', callback_datenquelle)

def initialisiere_plot():
    global plot, datenquelle
    plot = figure(plot_height=288, plot_width=577, tools="xpan", toolbar_location=None, x_range=Range1d(bounds=("auto")))

    initialisiere_line()

def initialisiere_line():
    global plot, datenquelle, datenquelle_analyse, line_livelog, line_analyse

    line_livelog = plot.line(x='x_achse', y='y_achse', source=datenquelle, line_color=None, line_width=2, name="sam_linie")

def initialisiere_checkboxen():
    global checkbox_group

    # checkbox_group = CheckboxGroup(labels=["Mitscrollen", "Option 2", "Option 3"], active=[0, 1])
    checkbox_group = CheckboxButtonGroup(labels=["Mitscrollen"], active=[0])

def erstelle_document():
    global  plot, checkbox_group

    curdoc().add_root(column(checkbox_group, plot))
    curdoc().add_periodic_callback(callback_endlosschleife, 70)

# Anfang des Programms
erstelle_graph()
initialisiere_checkboxen()
erstelle_document()

If you run this application you will see that the cpu usage is constantly growing.
I can solve this problem if I add a rollover to

datenquelle.stream(new_data) → datenquelle.stream(new_data, rollover=500)

It would be nice if I could run this app without a rollover. Do you have any suggestions to stop the cpu usage from growing so fast? Or is it correct that the CPU usage is constantly increasing so fast?
On my single-board-computer I can only handle 4500 values until the cpu is fully used and the data isn’t representing real-time-data (I have replaced the data from a sensor with random values for this example).

Thank you for your help I really appreciate it.

With best regards
mabo

The stream protocol is network-efficient, in that only the new data points are sent over the wire. But the new points still have to be appended to the existing data arrays (on both the Python an JS sides), an operation which does get more compute expensive as the arrays get longer[1]. My first suggestion is to batch the stream updates so that you can call stream less frequently. I.e., collect N update values before calling stream with all of them:

new_data = {
    # these can be a lists with more than one value
    'x_achse': [...],  
    'y_achse': [...],
}

So for example you might leave the current periodic callback interval, and collect new data points every time, but only call stream every 5th invocation.


  1. On the Python side, if the data is in actual list then maybe not. But appending to a NumPy array, or a TypedArray (which is always used on the JS side) requires making a copy and allocating new memory. ↩︎

1 Like