Document not registering callback initiated from asynchronous socket connection handler

Hi All. I have a bokeh app running through a flask server with data updates from another process being communicated via tcp. The data is being sent and received fine but the process(/Thread?) seems to not respond to curdoc().add_next_tick_callback(partial(source_update, new_data)). Everything up till this point works, and the code inside the callback never gets executed at all. I assume this has something to do with threading and the tornado IOLoop though am a bit out of depth here.

Here is some code which demonstrates the bare minimum setup - I’ve left out the socket protocol, flask serving, imports etc, the posted code only shows the relevant parts. If this is not enough and a full reproducible example is needed I can do that too. I’m hoping there is a fundamental misuse here that someone will be able to pick up. Instead of a data update, the callback here just prints something to show that the code is being run. It never runs unless i call it directly (source_update()) in which case there is a pending_write document lock error when trying to change the actual source object. I’ve tried many permutations of moving around the functions/decorators/using @without document lock. No luck.

Is anyone able to shed light on this?

Cheers :slight_smile:

doc = curdoc()
def home_app(doc): # Random widget for hte sake of having a function
    select = Select(title='Choose log', value=0, options=[0, 1, 2, 3, 4])
    doc.add_root(column([select]))

@gen.coroutine
def source_update(new_data): # This is the function I would like to call with curdoc().add_next_tick
    print('Inside')
    print('new_data')


async def receive_async(conn): # when the socket handler receives a connection, it calls this function.
    stream = IOStream(conn)
    print(f"Connected to {conn.getpeername()}")
    msg = await stream.read_until_close()
    msgs = msg_cleaver(msg)
    for msg in msgs:
        print(msg.msg)
    doc.add_next_tick_callback(partial(source_update, msgs))


def sock_handler(dataserver, fd, events): # listens for connections and calls the receiving function
    sock = dataserver
    conn, addr = sock.accept()
    conn.setblocking(0)
    io_loop = tornado.ioloop.IOLoop.current()
    io_loop.spawn_callback(receive_async, conn)


def bk_worker(): #Function called by Thread - following reference code for flask serving
    dataserver = make_server() # returns a simple socket server 
    loop = tornado.ioloop.IOLoop()
    loop.add_handler(dataserver.fileno(), partial(sock_handler, dataserver), loop.READ)
    server = Server({'/': home_app}, io_loop=loop, allow_websocket_origin=["localhost:5000", '127.0.0.1:5000'])
    server.io_loop.start()
    server.start()


Thread(target=bk_worker).start()

Hi @Han please edit your post to use code formatting.

My bad! Done.

So the fundamental issue is that the doc = curdoc() global variable which is referenced in you receive_async is not at all the same Document that ends up servicing a user-session when they connect with a browser. The purpose of a “Bokeh app” is to be a factory for documents. The app code in home_app is called every time a connection is made, in order to populate a new document just for that session. Or in other words, when Bokeh calls home_app, it is not with the doc variable at the top of your script. TLDR; the doc.add_next_tick_callback has been add to a document that no-one ever actually sees.

There’s a couple of different possibilities.

  • If receive_async has access to the Server object, it could loop through all the current active sessions, and add the callback to all the session documents. That would support a “push” model.

  • You could use the receive_async to update a mutable global value, and the apps can add periodic callbacks to “poll” the state of that global object. This is more of a “pull” model.

There are probably other options as well.

1 Like

Hi Bryan,

Thanks a lot. That was a big help. I ended up passing the server object as a parameter to functions going down to the 2nd-last callback which loops through the sessions and adds the final callback to the doc for each session. Though, the now better understanding of the Document object will likely lead to other solutions and more use cases of this pattern. Much appreciated.

1 Like