Standalone WSGI web container with client-data upload to embedded bokeh server

Hi,

I have a bokeh server application that enables interactive analysis of multi-class classification data. A final step is to support analysis of data uploaded via an authenticated user.

The basic model I am following is to use a flask app deployed via a standalone WSGI container that embeds the bokeh server. The index route presents a file-upload HTML, reads the user-provided data from the upload filestream and serializes it to JSON, and saves in the flask session as session[‘data’].

The URL is then redirected to an /embed route which passes the data to the bokeh server app as an argument.

This works as expected when I use the development flask server or if I use Gevent as a production flask server.

If I try to extend the functionality and run via Gunicorn, using the example on bokeh’s github (1) as a starting point, things are inconsistent.

The nature of the inconsistency is that sometimes the data are not loaded, other times it is, and there seems to be no determinism. Empirically, it seems to work more often with only one worker process for Gunicorn, but I cannot say that is always the case at present.

I suspect the problem is my unfamiliarity with Flask’s application context and the session information therein. (I have limited the size of my data to be fairly small, a 3x3 matrix of floats to avoid ~4k session cookie size I read about).

The following is a sketch of an app similar to what I am trying to run. Disclaimer this will not actually run as is because I haven’t provided the form upload HTML and CSS, etc. but hopefully it gives someone an idea of where I am doing something egregious given the new-to-me Flask framework.

Additionally, are there any simple examples around that might be a good illustrative starting point to passing data around like this to ultimately forward to bokeh using flask session (or Flask-Session extension) under Gunicorn?

Thanks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
"""
import os

import asyncio

import numpy as np
import pandas as pd

from datetime import timedelta

from flask import Flask, session
from flask import redirect, render_template, url_for

from flask_session import Session
#from werkzeug.utils import secure_filename

from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileAllowed, FileRequired

from threading import Thread

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

from bokeh.application import Application
from bokeh.application.handlers import FunctionHandler

from bokeh.embed import server_document

from bokeh.server.server import BaseServer
from bokeh.server.tornado import BokehTornado
from bokeh.server.util import bind_sockets

from bokeh.models import Slider

_EMPTY_DATAFRAME_JSON = lambda: pd.DataFrame().to_json()


# flask app
SECRET_KEY = os.urandom(32)
DATA_ALLOW_EXTENSIONS = ['csv']

app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['DATA_ALLOW_EXENSIONS'] = DATA_ALLOW_EXTENSIONS

#app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_TYPE'] = 'filesystem'

# Maximum number of items a session keeps
# before it starts to delete items
app.config['SESSION_FILE_THRESHOLD'] = 100  

app.config['SESSION_PERMANENT'] = False
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=3)

#Session(app)

class DataForm(FlaskForm):
    file = FileField('data',
                     validators=[FileRequired(), FileAllowed(DATA_ALLOW_EXTENSIONS, 'Unallowed filetype')])
    submit = 'Upload data'


# NB: bind_sockets() will allow each process to listen on its own port if app is
#     run via gunicorn with multiple worker processes, e.g. "gunicorn -w 4 ..."  
sockets,port = bind_sockets("localhost", 0)


@app.route('/', methods=['GET', 'POST'])
def upload():
    form = DataForm()
    if form.validate_on_submit():
        _stream = form.file.data
        _stream.seek(0)
        session['data'] = pd.read_csv(_stream).to_json()
        return redirect(url_for('embed'))
    else:
        session['data'] = pd.DataFrame().to_json()

    return render_template('index.html', form=form)

@app.route('/embed', methods=['GET'])
def embed():
    if 'data' in session and session['data'] and session['data'] != _EMPTY_DATAFRAME_JSON():
        _data = session['data']
        session['data'] = _EMPTY_DATAFRAME_JSON()
    else:
        #return redirect(url_for('upload'))
        _data = pd.DataFrame(data=(100*np.random.rand(2,2)).astype(int), columns=('class_a','class_b')).to_json()

    script = server_document("http://localhost:%d/embed" % port, arguments=dict(data=_data))
    return render_template("embed.html", script=script, template="Flask")


def bk_app(doc):    
    args = doc.session_context.request.arguments
    df = pd.read_json(args.get('data')[0])

    slider = Slider(start=0, end=500, value=df.values.sum(), step=1, title="Trials")
    doc.add_root(slider)


bk_app = Application(FunctionHandler(bk_app))


def bk_worker():
    asyncio.set_event_loop(asyncio.new_event_loop())

    bokeh_tornado = BokehTornado({'/embed': bk_app}, extra_websocket_origins=["localhost:8000"])
    bokeh_http = HTTPServer(bokeh_tornado)
    bokeh_http.add_sockets(sockets)

    server = BaseServer(IOLoop.current(), bokeh_tornado, bokeh_http)
    server.start()
    server.io_loop.start()


Thread(target=bk_worker, daemon=True).start()

UPDATE

I have simplified my example for the current trivial case to not use intermediate storage of data in the session and instead directly call the bokeh server when the Flask-WTF form is validated. It seems like when I am running under Gunicorn with multiple worker processes, the form validation behaves inconsistently. I am sure I am missing something with how to handle this properly in that scenario.

With the following code, I run the app as follows to see printouts on the terminal window, with representative python printouts shown.

gunicorn -w 4 bkapp:app --error-logfile gerr.log 

FORM INVALID ... RETRY

FORM INVALID ... RETRY

FORM INVALID ... RETRY

FORM VALID ... DATA

For the following code …

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
"""
import os

import asyncio

import numpy as np
import pandas as pd

from datetime import timedelta

from flask import Flask, session
from flask import redirect, render_template, url_for

from flask_session import Session
#from werkzeug.utils import secure_filename

from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileAllowed, FileRequired

from threading import Thread

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

from bokeh.application import Application
from bokeh.application.handlers import FunctionHandler

from bokeh.embed import server_document

from bokeh.server.server import BaseServer
from bokeh.server.tornado import BokehTornado
from bokeh.server.util import bind_sockets

from bokeh.models import Slider

_EMPTY_DATAFRAME_JSON = lambda: pd.DataFrame().to_json()


# flask app
SECRET_KEY = os.urandom(32)
DATA_ALLOW_EXTENSIONS = ['csv']

app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['DATA_ALLOW_EXENSIONS'] = DATA_ALLOW_EXTENSIONS

#app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_TYPE'] = 'filesystem'

# Maximum number of items a session keeps
# before it starts to delete items
app.config['SESSION_FILE_THRESHOLD'] = 100  

app.config['SESSION_PERMANENT'] = False
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=3)

#Session(app)

class DataForm(FlaskForm):
    file = FileField('data',
                     validators=[FileRequired(), FileAllowed(DATA_ALLOW_EXTENSIONS, 'Unallowed filetype')])
    submit = 'Upload data'


# NB: bind_sockets() will allow each process to listen on its own port if app is
#     run via gunicorn with multiple worker processes, e.g. "gunicorn -w 4 ..."  
sockets,port = bind_sockets("localhost", 0)


@app.route('/', methods=['GET', 'POST'])
def upload():
    form = DataForm()
    if form.validate_on_submit():
        _stream = form.file.data
        _stream.seek(0)
        _data = pd.read_csv(_stream).to_json()
        print("FORM VALID ... DATA")
        script = server_document("http://localhost:%d/embed" % port, arguments=dict(data=_data))
        return render_template("embed.html", script=script, template="Flask")

    print("FORM INVALID ... RETRY")
    return render_template('index.html', form=form)

@app.route('/embed', methods=['GET'])
def embed():
    if 'data' in session and session['data'] and session['data'] != _EMPTY_DATAFRAME_JSON():
        _data = session['data']
        session['data'] = _EMPTY_DATAFRAME_JSON()
    else:
        #return redirect(url_for('upload'))
        _data = pd.DataFrame(data=(100*np.random.rand(2,2)).astype(int), columns=('class_a','class_b')).to_json()

    script = server_document("http://localhost:%d/embed" % port, arguments=dict(data=_data))
    return render_template("embed.html", script=script, template="Flask")


def bk_app(doc):    
    args = doc.session_context.request.arguments
    df = pd.read_json(args.get('data')[0])

    slider = Slider(start=0, end=500, value=df.values.sum(), step=1, title="Trials")
    doc.add_root(slider)


bk_app = Application(FunctionHandler(bk_app))


def bk_worker():
    asyncio.set_event_loop(asyncio.new_event_loop())

    bokeh_tornado = BokehTornado({'/embed': bk_app}, extra_websocket_origins=["localhost:8000"])
    bokeh_http = HTTPServer(bokeh_tornado)
    bokeh_http.add_sockets(sockets)

    server = BaseServer(IOLoop.current(), bokeh_tornado, bokeh_http)
    server.start()
    server.io_loop.start()


Thread(target=bk_worker, daemon=True).start()

Does this still happen if you remove/comment out the Bokeh parts?

Yes. It is independent of Bokeh.

Upon further inspection, it looks like it is related to CSRF protections with Flask-WTF forms when running under Gunicorn with multiple worker processes.

It is definitely not an issue with Bokeh. So, at this point just wondering if there is anyone in the bokeh user base or developer base that has used Flask forms in a Gunicorn container for the purposes of sending client data to a bokeh app that needs user-supplied data.

Thanks!

1 Like

The following looks like the missing piece; some more testing needed to be certain. Again, unrelated to bokeh but came up in the context of trying to share data with an embedded bokeh app.

The following import needs to be added.

from flask_wtf import CSRFProtect

and it needs to be instantiated with the Flask app, viz.

 csrf = CSRFProtect()
 csrf.init_app(app)
1 Like