Updating bokeh figure from asynchronous computation with panel

See also the topic over on the Holoviz discourse:

I would like to make a panel application which does asynchronous computation using a dask cluster where the results of the computation are displayed in Bokeh figures.

To update the bokeh document safely (see bokeh docs Updating From Threads and Unlocked Callbacks), the callback that updates the bokeh document should be scheduled via add_next_tick_callback.

If I don’t do this, and instead call the callback cb directly, I get the error:

raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

However, when I do use add_next_tick_callback, nothing happens and it seems that the callback isn’t called.
This might be a panel specific issue but I thought I’d post it here as I might be doing things wrong on the bokeh side of things.
Do I need to use without_document_lock()? I have tried to add this as well but it didn’t help. Is there another way of savely scheduling bokeh updates?

If you want to run the code below you need to run a dask LocalCluster and update the ip:port value for cluster.

import panel as pn
import param
import time
import numpy as np
from panel.io import unlocked
from tornado.ioloop import IOLoop
from dask.distributed import Client, as_completed
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from functools import partial

class ASyncProgressBar(param.Parameterized):
    completed = param.Integer(default=0)
    num_tasks = param.Integer(default=10, bounds=(1, None))

    async def run(self, futures):
        async for task in as_completed(futures):
            with unlocked():
                self.completed += 1

    @property
    def value(self):
        return int(100 * (self.completed / self.num_tasks))

    def reset(self):
        self.completed = 0

    @param.depends('completed', 'num_tasks')
    def view(self):
        if self.value:
            return pn.widgets.Progress(active=True, value=self.value, align="center", sizing_mode="stretch_width")
        else:
            return None


def task_func(arg):
    """This is a normal blocking function which we want to call multiple times"""
    time.sleep(np.random.randint(1, 2))
    return arg


class ASyncExample(param.Parameterized):
    select = param.Selector(objects=range(10))
    slider = param.Number(2, bounds=(0, 10))
    text = param.String()

    do_stuff = param.Action(lambda self: self._do_calc())
    normal_stuff = param.Action(lambda self: self._do_normal())
    result = param.Number(0)

    def __init__(self, cluster, **params):
        super(ASyncExample, self).__init__(**params)
        self.async_pbar = ASyncProgressBar()
        self.cluster = cluster

        self.doc = curdoc()
        self.source = ColumnDataSource({'x': range(10), 'y': range(10)})
        self.figure = figure()
        self.figure.line(x='x', y='y', source=self.source)
        self.bk_pane = pn.pane.Bokeh(self.figure)

        self.col = pn.Column(
            pn.pane.Markdown("## Starts async background process"),
            pn.Param(
                self,
                parameters=["do_stuff", 'normal_stuff', "result"],
                widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
                show_name=False,
            ),
            self.async_pbar.view,
            pn.layout.Divider(),
            pn.pane.Markdown("## Works while background process is running"),
            pn.Param(
                self,
                parameters=["select", "slider", "text"],
                widgets={"text": {"disabled": True}},
                show_name=False,
            ),
            self.bk_pane,
            max_width=500,
        )

    @param.depends("slider", "select", watch=True)
    def _on_slider_change(self):
        # This functions does some other python code which we want to keep responsive
        if self.select:
            select = self.select
        else:
            select = 0
        self.text = str(self.slider + select)

    def _do_normal(self):
        "updates the bokeh graph normally"
        self.update_source(list(np.random.randint(0, 2**31, 10)))

    def update_source(self, data):
        self.source.data.update({'y': data})

    async def overall_func(self, num_tasks):
        """
        This function asychronously sends out 10 tasks, which are sent to the progress bar to report on their completion
        Next, in a second step, the futures are combined in the final calculation and the result sent to the results field.
        """
        client = await Client(self.cluster, asynchronous=True)

        futures = client.map(task_func, np.random.randint(0, 2**31, num_tasks))
        await self.async_pbar.run(futures)
        result = await client.submit(np.mean, futures)
        data = await client.gather(futures)
        cb = partial(self.update_source, data)
        self.doc.add_next_tick_callback(cb)

        with unlocked():
            self.result = result
            self.param['do_stuff'].constant = False
            self.async_pbar.reset()

    def _do_calc(self):
        self.param['do_stuff'].constant = True
        num_tasks = 10
        self.async_pbar.num_tasks = num_tasks
        loop = IOLoop.current()
        loop.add_callback(self.overall_func, num_tasks)

    def panel(self):
        return self.col


# Path to dask localcluster, can be started from a seperate python console:
# >>> from dask.distributed import LocalCluster
# >>> cluster = LocalCluster()
# >>> cluster
# This returns the IP:port to use

cluster = '<ip:port for cluster>'

pn.config.sizing_mode = "stretch_width"
async_example = ASyncExample(cluster)
pn.serve(async_example.panel())

1 Like

I can’t figure out how to edit so I’ll reply instead.
I’ve fixed the example above by saving a reference to the document right after the button is pressed.

    def _do_calc(self):
        self.doc = curdoc()  # Insert this line
        self.param['do_stuff'].constant = True
1 Like