Streaming live data to bar chart

Hello,

I’m working on a dashboard for a manufacturing department, and my first goal is to simply display the output of various lines in a bar chart with different bin sizes, i.e. per hour, per day, per week, ect. I can get a simple chart to show up, but I want to be able to stream the data to it. I’ve scoured the internet for ways to do this but I couldn’t find any examples where someone was taking live data, and not generating it themselves in the update callback.

I’m nowhere near to an experienced programmer, but I’m not new to Python either. Hopefully it is something simple that I’m missing, and someone can quickly point it out.

Anyway I’m taking in a table of data that posts results every time a part comes off of the line. I’m storing the data in a pandas DataFrame with a datetime index. I’m then resampling it into the different bintimes I want, and passing it to the ColumnDataSource that way.

Timestamp Completed
2017-12-11 17:21:46.960 599187
2017-12-11 17:21:25.810 599180
2017-12-11 17:10:18.397 599056
2017-12-11 17:06:38.567 598991
2017-12-11 17:05:57.423 598978
2017-12-11 17:05:28.527 598974
2017-12-11 17:05:00.940 598966

The numbers in the right column are the unique keys of the field.

This is resampled and passed to the ColumnDataSource as:

Timestamp Completed
2017-12-11 17:00:00 14
2017-12-11 16:00:00 8
2017-12-11 15:00:00 10
2017-12-11 14:00:00 8
2017-12-11 13:00:00 0
2017-12-11 12:00:00 0
2017-12-11 11:00:00 30

The graphs work wonderfully, but when I try to stream the data with my get_update function:

def get_update(self):
    #updates ColumnDataSource
    #tried:
        #building dict directly
        #making it a ColumnDataSource, then re-making it a dataframe
        #making datetime type Timestamp and datetime64[ns]
       
    #fetch new data
    new_df = self.resample_output()
   
    #check to see if new bin was created
    if new_df.index[0] > self.df.index[0]:
        #if yes, patch previous one and stream new one
       
        stream = new_df[:1]
        patch = new_df[1:2]
        self.source.patch(patch)
        self.source.stream(stream)
       
    else:
        #just patch latest one
        patch = new_df[:1]
        self.source.patch(patch)

``

I keep getting an error:

Error thrown from periodic callback: TypeError("‘numpy.int64’ object is not iterable",)

When I tried reformatting the new data I simply got the same error with ‘numpy.int64’ replaced by ‘Timestamp’ or whatever I had formatted it as.

I’ve tried several different formats, passing the new data as a dict or as a dataframe, but I couldn’t get it to work.

Here is the rest of my code if you need it. A lof of the steps are pulled apart for debugging, but if you see any ways I can improve it please let me know, I’m always trying to learn! Thanks!

import pyodbc
import datetime
import math
import time
import pandas as pd
import numpy as np
from bokeh.plotting import figure, curdoc
from bokeh.io import show, output_file, output_notebook, save
from bokeh.models import ColumnDataSource, DatetimeTickFormatter, FixedTicker, LabelSet, Range1d
from bokeh.layouts import gridplot, row, column, widgetbox
from bokeh.core.properties import Instance, Seq
from bokeh.models.widgets import Div

output_file = “MOE1output.html”

class OutputChart:
def init(self, sql, conn, bintime = ‘h’, mult = 1, xlink=None):
self.sql = sql
self.conn = conn
self.xlink = xlink
self.df = None
self.bintime = bintime
self.mult = mult

def get_output(self):
    #gets the raw data, datetimeindexed unique ids from source tables
    rawdf = pd.read_sql_query(self.sql, self.conn, index_col = "Timestamp")
    self.rawdf = rawdf
    return self.rawdf

def resample_output(self):
    #resamples data into given bintime by counting unique ID field
    #could be combined with get_output and make_source
    rawdf = self.get_output()
    df = rawdf.resample(self.bintime, label='left', closed='left').count()
   
    #sorts in chronological order
    df = df.sort_index(ascending=False)
   
    #only shows most recent 100 bins
    df = df.head(n=100)
   
    #multiplier for number of parts on carrier
    df.Completed = df.Completed*self.mult
   
    self.df = df
   
    return self.df

def make_source(self, df):
    #turns df into ColumnDataSource
    #could be combined with resample_output and get_output
    self.source = ColumnDataSource(df)

    return self.source

def get_update(self):
    #updates ColumnDataSource
    #tried:
        #building dict directly
        #making it a ColumnDataSource, then re-making it a dataframe
        #making datetime type Timestamp and datetime64[ns]
       
    #fetch new data
    new_df = self.resample_output()
   
    #check to see if new bin was created
    if new_df.index[0] > self.df.index[0]:
        #if yes, patch previous one and stream new one
       
        stream = new_df[:1]
        patch = new_df[1:2]
        self.source.patch(patch)
        self.source.stream(stream)
       
    else:
        #just patch latest one
        patch = new_df[:1]
        self.source.patch(patch)

#draws output chart
def draw_figure(self, width=800, xlink=None):
   
    #dicts for settings based on bintime
    bins = {'h' : '%m-%d %H:%M', 'D' : '%a %m-%d', 'W' : '%F', 'M' : '%Y-%m'}
    titles = {'h' : 'Hourly', 'D' : 'Daily', 'W' : 'Weekly', 'M' : 'Monthly'}
    widths = {'h' : 3600000, 'D' : 86400000, 'W' : 604800000, 'M' : 2419200000}
   
    #space between bars
    pad = 0.9
   
    #create df
    df = self.resample_output()
   
    #create source
    source = self.make_source(df)
   
    #initiate now variable
    now = np.datetime64(datetime.datetime.now())

    #handle months because numpy gets confused, okay because only used to set ranges, could move to xlink check
    if self.bintime == 'M':
        #if bintime is a month, apporximate with 30 days
       
        padspace = now + np.timedelta64(1*30, 'D')
        dispdelta = now - np.timedelta64(7*30, 'D')
       
    else:
        #just use np.timedelta
       
        padspace = now + (0.75 * np.timedelta64(1, self.bintime))
        dispdelta = now - np.timedelta64(8, self.bintime)

    #create tickers because bokeh doesnt specify week beginning dates, /10** is to convert from ns to micros for bokeh
    tickers = pd.to_datetime(df.index.values).astype(int) / 10**6

    #check for linked ranges
    if self.xlink is None:
        #if no linked range, set based on calculated ranges, could move range calculation in here
       
        xrange = (dispdelta, padspace)
       
    else:
        #link ranges
        xrange = self.xlink
       
    #adjust y range to account for angled labels
    yrange = (0, df.Completed.max()*1.15)

    #create figure, need to add way to manually set plot height or set to fill space given
    output = figure(x_axis_type='datetime', plot_height=300, width=width, title=titles[self.bintime] + ' Output', x_range=xrange, y_range=yrange, tools ='xpan,reset', toolbar_location='above')
   
    #create bars
    output.vbar(x='Timestamp', top='Completed', width=widths[self.bintime]*pad, source=source)
   
    #format axis labels
    output.xaxis.formatter = DatetimeTickFormatter(hours=bins[self.bintime], days=bins[self.bintime], months=bins[self.bintime])
   
    #force axis to set ticker
    output.xaxis.ticker = FixedTicker(ticks = tickers)
   
    #set and add labels
    count = LabelSet(x='Timestamp', y='Completed', text='Completed', y_offset = 4, level='underlay', source=self.source, text_align='center')
    output.add_layout(count)
   
    #change angle based on width
    if width <= 550:
        #below 550, tilt them
       
        output.xaxis.major_label_orientation = math.pi/4
       
    elif width <= 300:
        #below 300, give more room on y range
       
        count.angle = math.pi/4
        count.text_align = 'left'
        output.y_range = Range1d(0, df.Completed.max()*1.25)

    return output

    #create range attribute, may not be necessary?
    self.xlink = output.xrange

#line connection and sql data, will be stored in db on webapp
mpl = {‘conn’ : REDACTED,
‘sql’ : REDACTED,
‘title’ : Div(text="""

MPL

""") }

lr = {‘conn’ : REDACTED,
‘sql’ : REDACTED,
‘title’ : Div(text="""

Low Run

""") }

#instantiate classes for mpl
mplD_Data = OutputChart(conn = mpl[‘conn’], sql=mpl[‘sql’], mult=2, bintime=‘D’)
mplh_Data = OutputChart(conn = mpl[‘conn’], sql=mpl[‘sql’], mult=2, bintime=‘h’)

#drawing figures for mpl, has to be done first to link ranges
mplD = mplD_Data.draw_figure(width=300)
mplh = mplh_Data.draw_figure(width=700)

#instantiate classes for lr
lrh_Data = OutputChart(conn = lr[‘conn’], sql=lr[‘sql’], xlink=mplD.x_range, bintime=‘D’)
lrD_Data = OutputChart(conn = lr[‘conn’], sql=lr[‘sql’], xlink=mplh.x_range, bintime=‘h’)

#create figures for lr
lrD = lrh_Data.draw_figure(width=300)
lrh = lrD_Data.draw_figure(width=700)

#set grid layout
grid = gridplot([[widgetbox(mpl[‘title’])],[mplD, mplh],[widgetbox(lr[‘title’])],[lrD, lrh]])

#callback rate
rate = 5000

#create document
curdoc().add_root(grid)

#add periodic callbacks
curdoc().add_periodic_callback(mplh_Data.get_update, rate)
curdoc().add_periodic_callback(mplD_Data.get_update, rate)
curdoc().add_periodic_callback(lrh_Data.get_update, rate)
curdoc().add_periodic_callback(lrD_Data.get_update, rate)

``

(resubmitted for formatting issues)

Hi,

Apologies I am too overburdened now to dive into your sizable code. Hopefully a complete minimal working example will be instructive, at least. This example streams a new bar every 10th count, and patches random updates to the last bar in between:

    from random import random

    from bokeh.driving import count
    from bokeh.io import curdoc
    from bokeh.models import ColumnDataSource
    from bokeh.plotting import figure

    p = figure(plot_height=400)

    source = ColumnDataSource(data=dict(x=, top=))

    p.vbar(x='x', top='top', width=2, source=source)

    @count()
    def update(i):
        if i % 10 == 0:
            source.stream(dict(x=[i], top=[5]))
        else:
            ind = len(source.data['top']) -1
            newtop = source.data['top'][ind] + random() - 0.5
            source.patch(dict(top=[(ind, newtop)]))

    curdoc().add_periodic_callback(update, 50)

    curdoc().add_root℗

Thanks,

Bryan

···

On Dec 12, 2017, at 12:11, Zach Sizemore <[email protected]> wrote:

Hello,

I'm working on a dashboard for a manufacturing department, and my first goal is to simply display the output of various lines in a bar chart with different bin sizes, i.e. per hour, per day, per week, ect. I can get a simple chart to show up, but I want to be able to stream the data to it. I've scoured the internet for ways to do this but I couldn't find any examples where someone was taking live data, and not generating it themselves in the update callback.

I'm nowhere near to an experienced programmer, but I'm not new to Python either. Hopefully it is something simple that I'm missing, and someone can quickly point it out.

Anyway I'm taking in a table of data that posts results every time a part comes off of the line. I'm storing the data in a pandas DataFrame with a datetime index. I'm then resampling it into the different bintimes I want, and passing it to the ColumnDataSource that way.

Timestamp Completed
2017-12-11 17:21:46.960 599187
2017-12-11 17:21:25.810 599180
2017-12-11 17:10:18.397 599056
2017-12-11 17:06:38.567 598991
2017-12-11 17:05:57.423 598978
2017-12-11 17:05:28.527 598974
2017-12-11 17:05:00.940 598966

The numbers in the right column are the unique keys of the field.

This is resampled and passed to the ColumnDataSource as:
                     
Timestamp Completed
2017-12-11 17:00:00 14
2017-12-11 16:00:00 8
2017-12-11 15:00:00 10
2017-12-11 14:00:00 8
2017-12-11 13:00:00 0
2017-12-11 12:00:00 0
2017-12-11 11:00:00 30

The graphs work wonderfully, but when I try to stream the data with my get_update function:
    def get_update(self):
        #updates ColumnDataSource
        #tried:
            #building dict directly
            #making it a ColumnDataSource, then re-making it a dataframe
            #making datetime type Timestamp and datetime64[ns]
            
        #fetch new data
        new_df = self.resample_output()
        
        #check to see if new bin was created
        if new_df.index[0] > self.df.index[0]:
            #if yes, patch previous one and stream new one
            
            stream = new_df[:1]
            patch = new_df[1:2]
            self.source.patch(patch)
            self.source.stream(stream)
            
        else:
            #just patch latest one
            patch = new_df[:1]
            self.source.patch(patch)

I keep getting an error:
Error thrown from periodic callback: TypeError("'numpy.int64' object is not iterable",)

When I tried reformatting the new data I simply got the same error with 'numpy.int64' replaced by 'Timestamp' or whatever I had formatted it as.

I've tried several different formats, passing the new data as a dict or as a dataframe, but I couldn't get it to work.

Here is the rest of my code if you need it. A lof of the steps are pulled apart for debugging, but if you see any ways I can improve it please let me know, I'm always trying to learn! Thanks!

import pyodbc
import datetime
import math
import time
import pandas as pd
import numpy as np
from bokeh.plotting import figure, curdoc
from bokeh.io import show, output_file, output_notebook, save
from bokeh.models import ColumnDataSource, DatetimeTickFormatter, FixedTicker, LabelSet, Range1d
from bokeh.layouts import gridplot, row, column, widgetbox
from bokeh.core.properties import Instance, Seq
from bokeh.models.widgets import Div

output_file = "MOE1output.html"

class OutputChart:
    def __init__(self, sql, conn, bintime = 'h', mult = 1, xlink=None):
        self.sql = sql
        self.conn = conn
        self.xlink = xlink
        self.df = None
        self.bintime = bintime
        self.mult = mult

    def get_output(self):
        #gets the raw data, datetimeindexed unique ids from source tables
        rawdf = pd.read_sql_query(self.sql, self.conn, index_col = "Timestamp")
        self.rawdf = rawdf
        return self.rawdf
    
    def resample_output(self):
        #resamples data into given bintime by counting unique ID field
        #could be combined with get_output and make_source
        rawdf = self.get_output()
        df = rawdf.resample(self.bintime, label='left', closed='left').count()
        
        #sorts in chronological order
        df = df.sort_index(ascending=False)
        
        #only shows most recent 100 bins
        df = df.head(n=100)
        
        #multiplier for number of parts on carrier
        df.Completed = df.Completed*self.mult
        
        self.df = df
        
        return self.df

    def make_source(self, df):
        #turns df into ColumnDataSource
        #could be combined with resample_output and get_output
        self.source = ColumnDataSource(df)
    
        return self.source
    
    def get_update(self):
        #updates ColumnDataSource
        #tried:
            #building dict directly
            #making it a ColumnDataSource, then re-making it a dataframe
            #making datetime type Timestamp and datetime64[ns]
            
        #fetch new data
        new_df = self.resample_output()
        
        #check to see if new bin was created
        if new_df.index[0] > self.df.index[0]:
            #if yes, patch previous one and stream new one
            
            stream = new_df[:1]
            patch = new_df[1:2]
            self.source.patch(patch)
            self.source.stream(stream)
            
        else:
            #just patch latest one
            patch = new_df[:1]
            self.source.patch(patch)

    #draws output chart
    def draw_figure(self, width=800, xlink=None):
        
        #dicts for settings based on bintime
        bins = {'h' : '%m-%d %H:%M', 'D' : '%a %m-%d', 'W' : '%F', 'M' : '%Y-%m'}
        titles = {'h' : 'Hourly', 'D' : 'Daily', 'W' : 'Weekly', 'M' : 'Monthly'}
        widths = {'h' : 3600000, 'D' : 86400000, 'W' : 604800000, 'M' : 2419200000}
        
        #space between bars
        pad = 0.9
        
        #create df
        df = self.resample_output()
        
        #create source
        source = self.make_source(df)
        
        #initiate now variable
        now = np.datetime64(datetime.datetime.now())

        #handle months because numpy gets confused, okay because only used to set ranges, could move to xlink check
        if self.bintime == 'M':
            #if bintime is a month, apporximate with 30 days
            
            padspace = now + np.timedelta64(1*30, 'D')
            dispdelta = now - np.timedelta64(7*30, 'D')
            
        else:
            #just use np.timedelta
            
            padspace = now + (0.75 * np.timedelta64(1, self.bintime))
            dispdelta = now - np.timedelta64(8, self.bintime)

        #create tickers because bokeh doesnt specify week beginning dates, /10** is to convert from ns to micros for bokeh
        tickers = pd.to_datetime(df.index.values).astype(int) / 10**6

        #check for linked ranges
        if self.xlink is None:
            #if no linked range, set based on calculated ranges, could move range calculation in here
            
            xrange = (dispdelta, padspace)
            
        else:
            #link ranges
            xrange = self.xlink
            
        #adjust y range to account for angled labels
        yrange = (0, df.Completed.max()*1.15)

        #create figure, need to add way to manually set plot height or set to fill space given
        output = figure(x_axis_type='datetime', plot_height=300, width=width, title=titles[self.bintime] + ' Output', x_range=xrange, y_range=yrange, tools ='xpan,reset', toolbar_location='above')
        
        #create bars
        output.vbar(x='Timestamp', top='Completed', width=widths[self.bintime]*pad, source=source)
        
        #format axis labels
        output.xaxis.formatter = DatetimeTickFormatter(hours=bins[self.bintime], days=bins[self.bintime], months=bins[self.bintime])
        
        #force axis to set ticker
        output.xaxis.ticker = FixedTicker(ticks = tickers)
        
        #set and add labels
        count = LabelSet(x='Timestamp', y='Completed', text='Completed', y_offset = 4, level='underlay', source=self.source, text_align='center')
        output.add_layout(count)
        
        #change angle based on width
        if width <= 550:
            #below 550, tilt them
            
            output.xaxis.major_label_orientation = math.pi/4
            
        elif width <= 300:
            #below 300, give more room on y range
            
            count.angle = math.pi/4
            count.text_align = 'left'
            output.y_range = Range1d(0, df.Completed.max()*1.25)

        return output

        #create range attribute, may not be necessary?
        self.xlink = output.xrange

#line connection and sql data, will be stored in db on webapp
mpl = {'conn' : REDACTED,
'sql' : REDACTED,
'title' : Div(text="""
<div><h1>MPL</h1></div>
""")
}

lr = {'conn' : REDACTED,
'sql' : REDACTED,
'title' : Div(text="""
<div><h1>Low Run</h1></div>
""")
}

#instantiate classes for mpl
mplD_Data = OutputChart(conn = mpl['conn'], sql=mpl['sql'], mult=2, bintime='D')
mplh_Data = OutputChart(conn = mpl['conn'], sql=mpl['sql'], mult=2, bintime='h')

#drawing figures for mpl, has to be done first to link ranges
mplD = mplD_Data.draw_figure(width=300)
mplh = mplh_Data.draw_figure(width=700)

#instantiate classes for lr
lrh_Data = OutputChart(conn = lr['conn'], sql=lr['sql'], xlink=mplD.x_range, bintime='D')
lrD_Data = OutputChart(conn = lr['conn'], sql=lr['sql'], xlink=mplh.x_range, bintime='h')

#create figures for lr
lrD = lrh_Data.draw_figure(width=300)
lrh = lrD_Data.draw_figure(width=700)

#set grid layout
grid = gridplot([[widgetbox(mpl['title'])],[mplD, mplh],[widgetbox(lr['title'])],[lrD, lrh]])

#callback rate
rate = 5000

#create document
curdoc().add_root(grid)

#add periodic callbacks
curdoc().add_periodic_callback(mplh_Data.get_update, rate)
curdoc().add_periodic_callback(mplD_Data.get_update, rate)
curdoc().add_periodic_callback(lrh_Data.get_update, rate)
curdoc().add_periodic_callback(lrD_Data.get_update, rate)

(resubmitted for formatting issues)

--
You received this message because you are subscribed to the Google Groups "Bokeh Discussion - Public" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [email protected].
To post to this group, send email to [email protected].
To view this discussion on the web visit https://groups.google.com/a/continuum.io/d/msgid/bokeh/de4afc69-f172-4ad0-9302-19f54252b7f0%40continuum.io.
For more options, visit https://groups.google.com/a/continuum.io/d/optout.