Pattern to progressively show the data as they become available?

My app displays data from multiple remote sources.
In the first cut, the plot won’t be shown until data from all sources are fetched. So the users need to wait a while, as some of the sources can be slow.

What I’d like to do is show a base plot, then progressively plot the data as they become available.

  1. fetch data in the background
  2. show the base plot to users.
  3. when data from a source is ready, plot them

What’d be a good way to do it? So far I could approximate it by using scheduling the remote data plot with doc.add_timeout_callback(). See the attached example below.

Thanks!


Example codes:

notebook_url = "localhost:8888"

import asyncio
import time

import bokeh
from bokeh.models import Range1d
from bokeh.io import show, output_notebook
from bokeh.layouts import layout
from bokeh.plotting import figure, ColumnDataSource

print("bokeh v:", bokeh.__version__)

def create_plot_src_func(fig, label, delay_sec, color):
    # simulate fetching data remotely that takes time
    def get_remote_data():
        time.sleep(delay_sec)
        source = ColumnDataSource(data=dict(
            x=[delay_sec],
            y=[delay_sec],
        ))
        return source

    # start fetching remote data right away in background
    data_task = asyncio.create_task(asyncio.to_thread(get_remote_data))

    # return an async function that will get the remote data and plot them
    async def do_plot():
        source = await data_task
        fig.scatter(source=source, size=10 + (delay_sec ** 2) * 3, marker="square", fill_color=color)

    return do_plot


def test_bokeh_server(notebook_url="localhost:8888"):
    async def async_create_interact_ui():
        fig = figure(
            title="Test Bokeh Progerssive Plot",
        )

        fig.x_range = Range1d(0, 10)
        fig.y_range = Range1d(0, 10)

        # approximate progressive plot by ordering the function in the expected execution time
        plot_fns = [
            create_plot_src_func(fig, "fast", 1, "blue"),
            create_plot_src_func(fig, "slow", 3, "green"),
            create_plot_src_func(fig, "slowest",6, "red"),
        ]
        return fig, plot_fns


    def create_interact_ui(doc):
        async def do_create_ui():
            fig, plot_src_fns = await async_create_interact_ui()
            # first show the base figure to users
            doc.add_root(layout([
                [fig, ]
            ]))
            # then progressively show data as they become avaialble
            for i, fn in enumerate(plot_src_fns):
                doc.add_timeout_callback(fn, i * 100 + 500)

        doc.add_next_tick_callback(do_create_ui)


    output_notebook(verbose=False, hide_banner=True)
    show(create_interact_ui, notebook_url=notebook_url)

test_bokeh_server(notebook_url=notebook_url)

@orionlee I am not sure I understand your question. Your code seems to function, are you asking for ways improve it? Or something else?

Generally speaking, you’d want to set up your plots and all your glyphs once, up front, with empty data sources, if possible. Then a periodic callback that “pulls” data can updarte the Bokeh ColumnDataSource for the glyphs you created. Alternatively, if you have data “pushed” in from externally, then some async callback could also update a CDS in response.

Thanks for looking into it.

My codes work, but it’s only an approximation of showing data as soon as they become available, because a slow data source can still block the sources not yet rendered.

E.g., in my codes, I ordered the source update functions in the typical running time of each source from the fastest to the slowest:

        plot_fns = [
            create_plot_src_func(fig, "fast", 1, "blue"),
            create_plot_src_func(fig, "slow", 3, "green"),
            create_plot_src_func(fig, "slowest",6, "red"),
        ]

For example, if the fast source is suddenly slow, taking 9 seconds (instead of 1 second) to fetch the data, it’d block the remaining sources from being rendered.

I think I understand the reason but not the solution. When a function passed to add_timeout_callback() executes, the Document object is locked, effectively blocking other functions from updating the plot.


For the suggestion of using periodic callbacks, do you mean to have the source update function fired off periodically, and unregister the callback once it’s done? Something like

# the function to be passed to add_periodic_callback()
def a_source_update_func():
    # data_task: a background asyncio task fetching the remote data
    if not data_task.done():
        return

    data = data_task.result()
    # update the source / plots using `data`
    # ...


    # work is done. Unregister the callback from the Document
    callback_obj = ...  # somehow locate the callback object
    curdoc().remove_periodic_callback(callback_obj)

You also said one typically sets up the glyphs and the empty data source up front.
What’d be the disadvantage if I create the data source and the glyphs only after the remote data is fetched?

@orionlee thanks for the clarification. When you said “slow callback” I thought you meant that new data came in slowly, i.e. the callback would get called infrequently, not that the callback itself took a very long time to execute.

I think you’ll want to use the technique from this part of the docs:

https://docs.bokeh.org/en/latest/docs/user_guide/server/app.html#updating-from-unlocked-callbacks

which demonstrates how you can release the document locks and update from threads to avoid blocking the rest of the application.

What’d be the disadvantage if I create the data source and the glyphs only after the remote data is fetched?

Well for one thing, your “first callback” will have to do more/different work than all the subsequent invocations. IMO it’s preferable to set things up so that callbacks always do the same thing eery invocation. But it’s also just the usage scenario Bokeh is geared towards. Pretty much every interactive update example in the docs follows this pattern. It is tried and well-tested at this point.

Thanks for the tips. Looking at the doc, I think this part would be slightly more appropriate for my case: fetch data in a background thread and schedule the actual bokeh update with doc.add_next_tick_callback():

https://docs.bokeh.org/en/latest/docs/user_guide/server/app.html#updating-from-threads

In my case, I don’t have new data coming in over time. Instead, I pull data from multiple remote sources (each source is fetched once to its own respective ColumnDataSource): using source.stream() is unnecessary as a source’s data is constant once fetched.

Right, I was not suggesting to use stream necessarily, merely pointing out that blocking updates could release the document lock if need be.

I’ve updated and deployed my codes using a variant of :
https://docs.bokeh.org/en/latest/docs/user_guide/server/app.html#updating-from-unlocked-callbacks
and ran into some intermittent issues (~5 incidents out of 35 requests from server logs) that I could not reproduce (I’ve only seen in server logs, but cannot reproduce it)

Intermittently, there is an AttributeError: 'DocumentCallbackManager' object has no attribute '_change_callbacks'
at the line of scheduling the locked updates with doc.add_next_tick_callback(partial(do_catalog_init_locked, result=result)).

Does it ring any bell?

Stacktrace:

Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x3e75e76593d0>>, <Task finished name='Task-72' coro=<async_parse_and_add_catalogs_figure_elements.<locals>.create_catalog_plot_fn.<locals>.do_catalog_init_unlocked() done, defined at /usr/local/lib/python3.11/site-packages/bokeh/document/locking.py:91> exception=AttributeError("'DocumentCallbackManager' object has no attribute '_change_callbacks'")>)
Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/tornado/ioloop.py", line 750, in _run_callback ret = callback()
^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/tornado/ioloop.py", line 774, in _discard_future_result
future.result()
File "/usr/local/lib/python3.11/site-packages/bokeh/document/locking.py", line 93, in _wrapper
await func(*args, **kw)
File "/app/tpf/lk_patch/interact.py", line 761, in do_catalog_init_unlocked
doc.add_next_tick_callback(partial(do_catalog_init_locked, result=result))
File "/usr/local/lib/python3.11/site-packages/bokeh/document/document.py", line 270, in add_next_tick_callback
return self.callbacks.add_session_callback(cb, callback, one_shot=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/bokeh/document/callbacks.py", line 193, in add_session_callback
self.trigger_on_change(SessionCallbackAdded(doc, callback_obj))
File "/usr/local/lib/python3.11/site-packages/bokeh/document/callbacks.py", line 423, in trigger_on_change
invoke_with_curdoc(doc, invoke_callbacks)
File "/usr/local/lib/python3.11/site-packages/bokeh/document/callbacks.py", line 453, in invoke_with_curdoc
return f()
^^^
File "/usr/local/lib/python3.11/site-packages/bokeh/document/callbacks.py", line 421, in invoke_callbacks
for cb in self.change_callbacks():
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/bokeh/document/callbacks.py", line 383, in change_callbacks
return tuple(self._change_callbacks.values())
^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'DocumentCallbackManager' object has no attribute '_change_callbacks'

Version: Bokeh server version 3.6.1 (running on Tornado 6.4.2)

I haven’t seen that before. Unfortunately I cannot speculate without actual complete code to run and investigate directly.