Issue with Tornado/Callbacks

I’ve been building an application to consume a UDP stream and add the data as it comes in to a bokeh plot. I can consume the UDP stream fine and most of the time the bokeh callbacks work to add the data but sometimes I get a long error message as follows:

Build is on bokeh 0.12.9, tornado version 4.5.2:

2017-10-25 14:35:52,825 Exception in callback functools.partial(<function wrap..null_wrapper at 0x000000ADC0CA3048>)

Traceback (most recent call last):

File “d:\afcbm\continuum\lib\site-packages\tornado\ioloop.py”, line 605, in _run_callback

ret = callback()

File “d:\afcbm\continuum\lib\site-packages\tornado\stack_context.py”, line 277, in null_wrapper

return fn(*args, **kwargs)

File “d:\afcbm\continuum\lib\site-packages\bokeh\util\tornado.py”, line 133, in wrapper

self.remove_next_tick_callback(callback)

File “d:\afcbm\continuum\lib\site-packages\bokeh\util\tornado.py”, line 160, in remove_next_tick_callback

self._remove(callback, self._next_tick_callbacks)

File “d:\afcbm\continuum\lib\site-packages\bokeh\util\tornado.py”, line 155, in _remove

self._error_on_double_remove(callback, callbacks)

File “d:\afcbm\continuum\lib\site-packages\bokeh\util\tornado.py”, line 116, in _error_on_double_remove

raise ValueError(“Removing a callback twice (or after it’s already been run)”)

ValueError: Removing a callback twice (or after it’s already been run)

Minimal code example for the server:

import socket

import struct

import time

from bokeh.plotting import figure

from bokeh.layouts import column

from bokeh.models import ColumnDataSource

from bokeh.io import curdoc

from threading import Thread

from functools import partial

from tornado import gen

plot_column = column()

UDP_IP = “127.0.0.1” # 172.17.1.32

UDP_PORT = 29505

sock = socket.socket(socket.AF_INET, # Internet

socket.SOCK_DGRAM) # UDP

sock.bind((UDP_IP, UDP_PORT))

doc = curdoc()

f = figure(plot_width=700, plot_height=200)

source = ColumnDataSource(data=dict(x=, y=))

l = f.circle(x=‘x’, y=‘y’, source=source)

plot_column.children.insert(0, f)

@gen.coroutine

def update(x, val):

source.stream(dict(x=, y=[val]))

def receive_loop():

while True:

data, addr = sock.recvfrom(1024) # buffer size

val = struct.unpack(’!i’, data)[0]

doc.add_next_tick_callback(partial(update, x=time.time(), val=val))

doc.add_root(plot_column)

thread = Thread(target=receive_loop)

thread.start()

Minimal code example for the client:

import socket

from random import random

import time

import struct

UDP_IP = “127.0.0.1” # 172.17.1.32

UDP_PORT = 29505

sock = socket.socket(socket.AF_INET,

socket.SOCK_DGRAM)

x = 0

while True:

time.sleep(0.2)

x += 1

sock.sendto(struct.pack(’!i’, x), (UDP_IP, UDP_PORT))

Interestingly, I’ve been able to create a temporary workaround on my system by modifying the tornado.py file within bokeh by adding a time.sleep(.001) call at line 154 within the _remove method. With any amount of sleep in the process of removing the next tick callback the ValueError shown above doesn’t trigger. The error seems to show randomly on the system, with no real consistency of when it shows up within the stream of data.

Any thoughts/observations would be appreciated.

Thanks,

Kyle