[BUG] background thread stops when session is destroyed, leading to weird errors

Crosspost on Panel discourse:

I’m trying to set a background process that will continuously gather new data and put it in the panel cache. It then uses Dask futures to compute the results in a non-blocking fashion.

That works.

However, once the session is closed (browser tab closed or connection loss), I get strange errors like:
NameError: name 'copy' is not defined
or
NameError: name 'print' is not defined.

Minimal reproducible example:

from bokeh.plotting import figure, curdoc
from dask.distributed import Client
import time, threading, copy, param
import panel as pn
import holoviews as hv

pn.extension()
hv.extension('bokeh')

def dask_fn(entry,counter):
    print('Doing some heavy work with dask')
    time.sleep(3)
    return entry
    
def dask_callback(future):
    if future.status == 'finished':
        entry = future.result()
        print('Future finished successfully')
        del pn.state.cache['futures'][entry]
        print(f'Deleted future for {entry} from cache')
        
    # If future fails, it will automatically retry
    else:
        print('future failed after 10 retries')   


def BG_fn():
    counter = 0
    while True:
        counter += 1
        time.sleep(0.5)
        tickers = copy.deepcopy(list(pn.state.cache['data'].keys()))
        for entry in tickers:
            futures = copy.deepcopy(list(pn.state.cache['futures'].keys()))
            if not entry in futures:
                future = pn.state.cache['dask_client'].submit(dask_fn,entry,counter,retries=10)
                pn.state.cache['futures'][entry] = future
                future.add_done_callback(dask_callback)

# Data cache
if not 'data' in pn.state.cache:
    # Init doc & client in cache
    pn.state.cache['doc'] = curdoc()
    pn.state.cache['dask_client'] = Client(processes=False)
    
    # Init caches
    pn.state.cache['data'] = {}
    pn.state.cache['futures'] = {}
    
    # Init sample data
    pn.state.cache['data']['test'] = 'lalala'
    
    # Start periodic callback as thread
    thread = threading.Thread(target=BG_fn,args={})
    thread.daemon = True
    thread.start()
    
    pn.state.cache['BG_process'] = thread

class App(param.Parameterized):   
    def __init__(self, **params):
        super().__init__(**params)
        # Init important vars
        self.client = pn.state.cache['dask_client']
        self.doc = curdoc()
        self.plot = pn.pane.HoloViews(self.hv_plot)
    
    @property
    def hv_plot(self): 
        return hv.Curve([1,2,3])
        
app = App(name='')

app.plot.servable()

Console output:

2021-10-19 17:49:35,776 Starting Bokeh server version 2.4.1 (running on Tornado 6.1)
2021-10-19 17:49:35,778 User authentication hooks NOT provided (default user enabled)
2021-10-19 17:49:35,780 Bokeh app running at: http://localhost:5008/bug
2021-10-19 17:49:35,780 Starting Bokeh server with process id: 1638246
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
Future finished successfully
Deleted future for test from cache
Doing some heavy work with dask
distributed.client - ERROR - Error in callback <function dask_callback at 0x7f5b1a505a60> of <Future: finished, type: str, key: dask_fn-91da05f4aae28bbb3bb1232c7fef7ff3>:
Traceback (most recent call last):
File “/root/anaconda3/lib/python3.8/site-packages/distributed/client.py”, line 298, in execute_callback
fn(fut)
File “/root/mnt3/02_Data_Serving/Development/bug.ipynb”, line 24, in dask_callback
“def dask_callback(future):\n”,
NameError: name ‘print’ is not defined
Exception in thread Thread-1:
Traceback (most recent call last):
File “/root/anaconda3/lib/python3.8/threading.py”, line 932, in _bootstrap_inner
self.run()
File “/root/anaconda3/lib/python3.8/threading.py”, line 870, in run
self._target(*self._args, **self._kwargs)
File “/root/mnt3/02_Data_Serving/Development/bug.ipynb”, line 38, in BG_fn
" while True:\n",
NameError: name ‘copy’ is not defined

I’ve consulted the docs, but I’m not entirely sure what exactly happens once a session is destroyed and how this is causing issues with ongoing background threads, and how it could be fixed.

Any ideas?

When session is closed the module created for it is also deleted to prevent memory leaks, and I expect this is the source of what you are seeing. We have gotten several issues/complaints about memory leaks over the years, so we are now extremely aggressive with cleanup. This is not going to change. If you really need to have a permanent thread running, you will need to start it outside of any particular session. The spectrogram example shows how that might be accomplished with app_hooks.py but that is standard Bokeh usage. You would have to aske the Holoviz folks how to do something thing similar with their APIs.

1 Like

Perfect, this works! Many thanks! :smiley:

The trick is to execute all the initialization inside the app_hooks.py file, and catch the exception on the callback to the parameterized app once the session expires. What used to happen when I refreshed the page is that all sessions would freeze, but now everything continues to function even after a session has been destroyed.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.