Live data streaming gives this error. And the new data has exactly same columns as the original source.
And the code seems pretty clear, where am I doing wrong?, see the update function code below.
2020-06-08 19:11:36,690 Error thrown from periodic callback:
2020-06-08 19:11:36,691 Traceback (most recent call last):
File "/home/superusr/.local/lib/python3.6/site-packages/tornado/gen.py", line 501, in callback
result_list.append(f.result())
File "/home/superusr/.local/lib/python3.6/site-packages/tornado/gen.py", line 748, in run
yielded = self.gen.send(value)
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/server/session.py", line 70, in _needs_document_lock_wrapper
result = yield yield_for_all_futures(func(self, *args, **kwargs))
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/server/session.py", line 191, in with_document_locked
return func(*args, **kwargs)
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/document/document.py", line 1127, in wrapper
return doc._with_self_as_curdoc(invoke)
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/document/document.py", line 1113, in _with_self_as_curdoc
return f()
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/document/document.py", line 1126, in invoke
return f(*args, **kwargs)
File "/home/superusr/Desktop/colaboratory_python/mdpr7_cassandra.py", line 240, in updateLiveData
source.stream(df_newData.to_dict(orient='list'), 100000) #keep 100000 samples of data at all times
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/models/sources.py", line 414, in stream
self._stream(new_data, rollover)
File "/home/superusr/.local/lib/python3.6/site-packages/bokeh/models/sources.py", line 493, in _stream
raise ValueError("Must stream updates to all existing columns (missing: %s)" % ", ".join(sorted(missing)))
ValueError: Must stream updates to all existing columns (missing: index)
update function
def getLiveData(lastTimeStamp):
global dcu_data
global start_time
global end_time
query= """SELECT tagdatetime, tagid, parameterid, tagvalue, tagtype, tagname FROM load_profile_data
WHERE tagid>50001 and tagid<=50006 and tagdatetime>'{0}' ALLOW FILTERING;""".format(end_time)
print(query)
query_res = session.execute(query)
live_data=pd.DataFrame.from_dict(query_res)
print("\n\n\n\n\n\n\n")
print("updated data-------->\n",live_data)
print("\n")
live_data.sort_values("tagid", ascending = True, inplace = True)
live_data['tagdatetime'] = pd.to_datetime(live_data['tagdatetime'])
live_data['disp_tagdatetime'] = live_data['tagdatetime'].dt.strftime("%Y-%m-%d %H:%M:%S")
live_data['color']=np.nan
for i in available_tags:
live_data.loc[live_data['tagid'] == i, ['color']] = dcu_data.loc[dcu_data['tagid'] == i, 'color'].iloc[0]
live_data.sort_values("tagdatetime", ascending = True, inplace = True)
end_time = live_data['tagdatetime'].iloc[-1]
# dcu_data['color2']=np.nan
# for i in available_tags:
# live_data.loc[live_data['tagid'] == i, ['color2']] = dcu_data.loc[dcu_data['tagid'] == i, 'color2'].iloc[0]
dcu_data.append(live_data, ignore_index = True)
print("updated data-------->\n",dcu_data)
print("\n")
return live_data
def updateLiveData():
df_newData=getLiveData(end_time)
df_newData=df_newData[df_newData['tagid'].isin(curTag)]
print("updated data2222-------->\n",df_newData)
print("\n\n\n\n\n\n\n")
source.stream(df_newData.to_dict(orient='list'), 100000) #keep 100000 samples of data at all times
return