Streaming & Patching ColumnDataSource based on pandas DataFrame

This is probably more like feature proposal, but maybe I am missing something and it can be actually done.

Using pandas DataFrame to create ColumnDataSource looks like good idea, however once streaming and patching is considered, it seems that the benefits becomes disadvantages. Let me show what I mean.

First we will crete three different CDSs, one from pure python (dict and list), one using pandas DataFrame and one using DataFrame with categorical data.

>>> cds0 = CDS({'index': [0, 1, 2], 'a':[1, 2, 3], 'b':[1.1, 2.2, 3.3]})
>>> cds0.data
{'index': [0, 1, 2], 'a': [1, 2, 3], 'b': [1.1, 2.2, 3.3]}

>>> df1 = pd.DataFrame({'a': [1, 2, 3], 'b': [1.1, 2.2, 3.3]})
>>> df1
   a    b
0  1  1.1
1  2  2.2
2  3  3.3

>>> cds1 = CDS(df1)
>>> cds1.data
{'index': array([0, 1, 2]), 'a': array([1, 2, 3]), 'b': array([1.1, 2.2, 3.3])}

>>> df2 = pd.DataFrame({'a': [1,2,3], 'b': [1,2,2]})
>>> df2['b'] = df2['b'].astype('category')
>>> df2
   a  b
0  1  1
1  2  2
2  3  2
>>> df2.b.dtypes
CategoricalDtype(categories=[1, 2], ordered=False)

>>> cds2 = CDS(df2)
>>> cds2.data
{'index': array([0, 1, 2]), 'a': array([1, 2, 3]), 'b': [1, 2, 2]
Categories (2, int64): [1, 2]}

As one can see, the CDSs are internally represented with numpy.ndarray if created from DataFrame, otherwise with python list.

Next, let’s try to stream new data to the CDSs.

>>> cds0.stream({'index': [3], 'a': [4], 'b': [4.4]})
>>> cds0.data
{'index': [0, 1, 2, 3], 'a': [1, 2, 3, 4], 'b': [1.1, 2.2, 3.3, 4.4]}

>>> cds1.stream({'index': [3], 'a': [4], 'b': [4.4]})
>>> cds1.data
{'index': array([0, 1, 2, 3]), 'a': array([1, 2, 3, 4]), 'b': array([1.1, 2.2, 3.3, 4.4])}

>>> cds2.stream({'index': [3], 'a': [4], 'b': [1]})
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.9/site-packages/bokeh/models/sources.py", line 415, in stream
    self._stream(new_data, rollover)
  File "/usr/lib/python3.9/site-packages/bokeh/models/sources.py", line 527, in _stream
    self.data._stream(self.document, self, new_data, rollover, setter)
  File "/usr/lib/python3.9/site-packages/bokeh/core/property/wrappers.py", line 423, in _stream
    L.extend(new_data[k])
AttributeError: 'Categorical' object has no attribute 'extend'

As one can see, streaming to categorical column failed (surprisingly by complaining about missing extend method even though numpy.ndarray also des not have this method, only list does) even though the new data fits the categories.
BTW, the streaming fails also if one uses IntegerArray for columns…

Next, let’s try to patch some data.

>>> cds0.data
{'index': [0, 1, 2, 3], 'a': [1, 5, 3, 4], 'b': [1.1, 5.5, 3.3, 4.4]}
>>> cds0.patch({'a': [(slice(1, 2), [5])], 'b': [(slice(1, 2), [5.5])]})
>>> cds0.data
{'index': [0, 1, 2, 3], 'a': [1, 5, 3, 4], 'b': [1.1, 5.5, 3.3, 4.4]}

>>> cds1.data
{'index': array([0, 1, 2, 3]), 'a': array([1, 5, 3, 4]), 'b': array([1.1, 5.5, 3.3, 4.4])}
>>> cds1.patch({'a': [(slice(1, 2), [5])], 'b': [(slice(1, 2), [5.5])]})
>>> cds1.data
{'index': array([0, 1, 2, 3]), 'a': array([1, 5, 3, 4]), 'b': array([1.1, 5.5, 3.3, 4.4])}

>>> cds2.data
{'index': array([0, 1, 2]), 'a': array([1, 2, 3]), 'b': [1, 2, 2]
Categories (2, int64): [1, 2]}
>>> cds2.patch({'a': [(slice(1, 2), [4])], 'b': [(slice(1, 2), [1])]})
>>> cds2.data
{'index': array([0, 1, 2]), 'a': array([1, 4, 3]), 'b': [1, 1, 2]
Categories (2, int64): [1, 2]}

>>> cds2.patch({'a': [(slice(1, 2), [4])], 'b': [(slice(1, 2), [0])]})
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python3.9/site-packages/bokeh/models/sources.py", line 684, in patch
    self.data._patch(self.document, self, patches, setter)
  File "/usr/lib/python3.9/site-packages/bokeh/core/property/wrappers.py", line 463, in _patch
    self[name][ind] = value
  File "/usr/lib/python3.9/site-packages/pandas/core/arrays/_mixins.py", line 211, in __setitem__
    value = self._validate_setitem_value(value)
  File "/usr/lib/python3.9/site-packages/pandas/core/arrays/categorical.py", line 1893, in _validate_setitem_value
    raise ValueError(
ValueError: Cannot setitem on a Categorical with a new category, set the categories first

So patching works just fine, also patching categorical column with value not initially in the category faild as expected.

Finally, lets try to prepend (and delete) some data. ColumnDataSource has no explicite support for such operations, but one can use little trick to do so (at least sometimes).

>>> cds0.data
{'index': [0, 1, 2, 3], 'a': [1, 5, 3, 4], 'b': [1.1, 5.5, 3.3, 4.4]}

# prepend
>>> cds0.patch({'index': [(slice(None, 0), [-1])], 'a': [(slice(None, 0), [0])], 'b': [(slice(None, 0), [0])]})
>>> cds0.data
{'index': [-1, 0, 1, 2, 3], 'a': [0, 1, 5, 3, 4], 'b': [0, 1.1, 5.5, 3.3, 4.4]}

# delete from start
>>> cds0.patch({'index': [(slice(None, 2), [])], 'a': [(slice(None, 2), [])], 'b': [(slice(None, 2), [])]})
>>> cds0.data
{'index': [1, 2, 3], 'a': [5, 3, 4], 'b': [5.5, 3.3, 4.4]}

# delete from end
>>> cds0.data
{'index': [1, 2, 3], 'a': [5, 3, 4], 'b': [5.5, 3.3, 4.4]}
>>> cds0.patch({'index': [(slice(1, None), [])], 'a': [(slice(1, None), [])], 'b': [(slice(1, None), [])]})
>>> cds0.data
{'index': [1], 'a': [5], 'b': [5.5]}

# delete from middle
>>> cds0 = CDS({'index': [0, 1, 2, 3], 'a':[1, 2, 3, 4], 'b':[1.1, 2.2, 3.3, 4.4]})>>> cds0.data
{'index': [0, 1, 2, 3], 'a': [1, 2, 3, 4], 'b': [1.1, 2.2, 3.3, 4.4]}
>>> cds0.patch({'index': [(slice(1, 3), [])], 'a': [(slice(1, 3), [])], 'b': [(slice(1, 3), [])]})
>>> cds0.data
{'index': [0, 3], 'a': [1, 4], 'b': [1.1, 4.4]}

Such patching is not possible for DataFrame based CDSs. It either raised ValueError complaining about broadcasting or do nothing at all.

I understand that this is due to numpy.ndarray that does not support ‘size-changing’ operations, however it would be nice to have a support for such operations as there are many usecases where prepending data (going into the past) or droping data can be useful.

So to summarize:

  1. I would like to be able to stream ColumnDataSource with categorical columns.
    (This is the big one, since I must cast all my DataFrames to float to be able to stream new data to it.)
  2. I would like to hane support for prepending and removing data to/from ColumnDataSource.

@rad AFAIK no consideration has been given to categorical column specifically, so I suppose that is technically in “undefined behavior” territory. We do have tests to maintain that streaming and patching to basic column types. Maybe this falls outside that, or maybe there has been a downstream changes that broke something that used to work (but was not under test).

In any case the GitHub tracker is a better place to engage for either feature requests or bug reports. As a heads-up it will be much more helpful if you can organize the example cases in to a complete script(s) to share on the issue, rather than snippets from an interpreter.

If you want to take a poke at the code yourself, the relevant block is here:

It’s possible another block could be added there to special-case for Pandas series, but Pandas is not a hard dependency of Bokeh so it will need to first check that pandas is installed in the same way other places in the code base do.

Thank you Bryan, I will have a look on both (the issue tracker and the code itself). I just wanted to make sure I am not doing something wrong.