See also the topic over on the Holoviz discourse:
I would like to make a panel
application which does asynchronous computation using a dask
cluster where the results of the computation are displayed in Bokeh figures.
To update the bokeh document safely (see bokeh docs Updating From Threads and Unlocked Callbacks), the callback that updates the bokeh document should be scheduled via add_next_tick_callback
.
If I don’t do this, and instead call the callback cb
directly, I get the error:
raise RuntimeError("_pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes")
RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes
However, when I do use add_next_tick_callback
, nothing happens and it seems that the callback isn’t called.
This might be a panel
specific issue but I thought I’d post it here as I might be doing things wrong on the bokeh
side of things.
Do I need to use without_document_lock()
? I have tried to add this as well but it didn’t help. Is there another way of savely scheduling bokeh updates?
If you want to run the code below you need to run a dask LocalCluster and update the ip:port value for cluster
.
import panel as pn
import param
import time
import numpy as np
from panel.io import unlocked
from tornado.ioloop import IOLoop
from dask.distributed import Client, as_completed
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from functools import partial
class ASyncProgressBar(param.Parameterized):
completed = param.Integer(default=0)
num_tasks = param.Integer(default=10, bounds=(1, None))
async def run(self, futures):
async for task in as_completed(futures):
with unlocked():
self.completed += 1
@property
def value(self):
return int(100 * (self.completed / self.num_tasks))
def reset(self):
self.completed = 0
@param.depends('completed', 'num_tasks')
def view(self):
if self.value:
return pn.widgets.Progress(active=True, value=self.value, align="center", sizing_mode="stretch_width")
else:
return None
def task_func(arg):
"""This is a normal blocking function which we want to call multiple times"""
time.sleep(np.random.randint(1, 2))
return arg
class ASyncExample(param.Parameterized):
select = param.Selector(objects=range(10))
slider = param.Number(2, bounds=(0, 10))
text = param.String()
do_stuff = param.Action(lambda self: self._do_calc())
normal_stuff = param.Action(lambda self: self._do_normal())
result = param.Number(0)
def __init__(self, cluster, **params):
super(ASyncExample, self).__init__(**params)
self.async_pbar = ASyncProgressBar()
self.cluster = cluster
self.doc = curdoc()
self.source = ColumnDataSource({'x': range(10), 'y': range(10)})
self.figure = figure()
self.figure.line(x='x', y='y', source=self.source)
self.bk_pane = pn.pane.Bokeh(self.figure)
self.col = pn.Column(
pn.pane.Markdown("## Starts async background process"),
pn.Param(
self,
parameters=["do_stuff", 'normal_stuff', "result"],
widgets={"result": {"disabled": True}, "do_stuff": {"button_type": "success"}},
show_name=False,
),
self.async_pbar.view,
pn.layout.Divider(),
pn.pane.Markdown("## Works while background process is running"),
pn.Param(
self,
parameters=["select", "slider", "text"],
widgets={"text": {"disabled": True}},
show_name=False,
),
self.bk_pane,
max_width=500,
)
@param.depends("slider", "select", watch=True)
def _on_slider_change(self):
# This functions does some other python code which we want to keep responsive
if self.select:
select = self.select
else:
select = 0
self.text = str(self.slider + select)
def _do_normal(self):
"updates the bokeh graph normally"
self.update_source(list(np.random.randint(0, 2**31, 10)))
def update_source(self, data):
self.source.data.update({'y': data})
async def overall_func(self, num_tasks):
"""
This function asychronously sends out 10 tasks, which are sent to the progress bar to report on their completion
Next, in a second step, the futures are combined in the final calculation and the result sent to the results field.
"""
client = await Client(self.cluster, asynchronous=True)
futures = client.map(task_func, np.random.randint(0, 2**31, num_tasks))
await self.async_pbar.run(futures)
result = await client.submit(np.mean, futures)
data = await client.gather(futures)
cb = partial(self.update_source, data)
self.doc.add_next_tick_callback(cb)
with unlocked():
self.result = result
self.param['do_stuff'].constant = False
self.async_pbar.reset()
def _do_calc(self):
self.param['do_stuff'].constant = True
num_tasks = 10
self.async_pbar.num_tasks = num_tasks
loop = IOLoop.current()
loop.add_callback(self.overall_func, num_tasks)
def panel(self):
return self.col
# Path to dask localcluster, can be started from a seperate python console:
# >>> from dask.distributed import LocalCluster
# >>> cluster = LocalCluster()
# >>> cluster
# This returns the IP:port to use
cluster = '<ip:port for cluster>'
pn.config.sizing_mode = "stretch_width"
async_example = ASyncExample(cluster)
pn.serve(async_example.panel())