Welcome to part 4 of our sentiment analysis application with Dash and Python. Next, we're going to tie everything together up to this point to create a basic live-updating graph of Twitter sentiment for a term that we choose. To do this, all I am going to do is take our updates and apply them to the Live Graphs with Dash tutorial code, which was:
import dash from dash.dependencies import Output, Event import dash_core_components as dcc import dash_html_components as html import plotly import random import plotly.graph_objs as go from collections import deque X = deque(maxlen=20) X.append(1) Y = deque(maxlen=20) Y.append(1) app = dash.Dash(__name__) app.layout = html.Div( [ dcc.Graph(id='live-graph', animate=True), dcc.Interval( id='graph-update', interval=1*1000 ), ] ) @app.callback(Output('live-graph', 'figure'), events=[Event('graph-update', 'interval')]) def update_graph_scatter(): X.append(X[-1]+1) Y.append(Y[-1]+Y[-1]*random.uniform(-0.1,0.1)) data = plotly.graph_objs.Scatter( x=list(X), y=list(Y), name='Scatter', mode= 'lines+markers' ) return {'data': [data],'layout' : go.Layout(xaxis=dict(range=[min(X),max(X)]), yaxis=dict(range=[min(Y),max(Y)]),)} if __name__ == '__main__': app.run_server(debug=True)
All we need to do here is change a few minor things, and we'll be all set. Basically, our main goal is replace the X and Y variables here with our data. To begin, we need to connect to the database. Ideally, we would like to connect at the top of this script, but, Dash uses threads, and the connection object wont appreciate it. If we do this, we will get the following error: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 7828 and this is thread id 14092
. You also wont see that error, unless you're logging errors. Instead, we need to establish the connection object inside of the update graph function. As usual, there may actually be a better way than this, I just don't know it!
Let's start with adding our imports:
import sqlite3 import pandas as pd
Next, inside of our update_graph_scatter
function, we'll add
conn = sqlite3.connect('twitter.db') c = conn.cursor() df = pd.read_sql("SELECT * FROM sentiment WHERE tweet LIKE '%olympic%' ORDER BY unix DESC LIMIT 1000", conn) df.sort_values('unix', inplace=True) df['sentiment_smoothed'] = df['sentiment'].rolling(int(len(df)/5)).mean() df.dropna(inplace=True)
Now let's create the X
and Y
variables:
X = df.unix.values[-100:] Y = df.sentiment_smoothed.values[-100:]
Now, the rest stays the same. Let's just delete X and Y from the top of the script (where we define them as deque objects). The full script:
import dash from dash.dependencies import Output, Event import dash_core_components as dcc import dash_html_components as html import plotly import random import plotly.graph_objs as go from collections import deque import sqlite3 import pandas as pd #popular topics: google, olympics, trump, gun, usa app = dash.Dash(__name__) app.layout = html.Div( [ html.H2('Live Twitter Sentiment'), dcc.Graph(id='live-graph', animate=True), dcc.Interval( id='graph-update', interval=1*1000 ), ] ) @app.callback(Output('live-graph', 'figure'), events=[Event('graph-update', 'interval')]) def update_graph_scatter(): try: conn = sqlite3.connect('twitter.db') c = conn.cursor() df = pd.read_sql("SELECT * FROM sentiment WHERE tweet LIKE '%olympic%' ORDER BY unix DESC LIMIT 1000", conn) df.sort_values('unix', inplace=True) df['sentiment_smoothed'] = df['sentiment'].rolling(int(len(df)/5)).mean() df.dropna(inplace=True) X = df.unix.values[-100:] Y = df.sentiment_smoothed.values[-100:] data = plotly.graph_objs.Scatter( x=X, y=Y, name='Scatter', mode= 'lines+markers' ) return {'data': [data],'layout' : go.Layout(xaxis=dict(range=[min(X),max(X)]), yaxis=dict(range=[min(Y),max(Y)]),)} except Exception as e: with open('errors.txt','a') as f: f.write(str(e)) f.write('\n') if __name__ == '__main__': app.run_server(debug=True)
For this to be streaming live, you will need to have the following program also running, with the API credentials filled out:
from tweepy import Stream from tweepy import OAuthHandler from tweepy.streaming import StreamListener import json import sqlite3 from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer from unidecode import unidecode import time analyzer = SentimentIntensityAnalyzer() conn = sqlite3.connect('twitter.db') c = conn.cursor() def create_table(): try: c.execute("CREATE TABLE IF NOT EXISTS sentiment(unix REAL, tweet TEXT, sentiment REAL)") c.execute("CREATE INDEX fast_unix ON sentiment(unix)") c.execute("CREATE INDEX fast_tweet ON sentiment(tweet)") c.execute("CREATE INDEX fast_sentiment ON sentiment(sentiment)") conn.commit() except Exception as e: print(str(e)) create_table() #consumer key, consumer secret, access token, access secret. ckey="" csecret="" atoken="" asecret="" class listener(StreamListener): def on_data(self, data): try: data = json.loads(data) tweet = unidecode(data['text']) time_ms = data['timestamp_ms'] vs = analyzer.polarity_scores(tweet) sentiment = vs['compound'] print(time_ms, tweet, sentiment) c.execute("INSERT INTO sentiment (unix, tweet, sentiment) VALUES (?, ?, ?)", (time_ms, tweet, sentiment)) conn.commit() except KeyError as e: print(str(e)) return(True) def on_error(self, status): print(status) while True: try: auth = OAuthHandler(ckey, csecret) auth.set_access_token(atoken, asecret) twitterStream = Stream(auth, listener()) twitterStream.filter(track=["a","e","i","o","u"]) except Exception as e: print(str(e)) time.sleep(5)
I made a few small changes to the above script. One to deal with errors and not break, the other making all columns indexes, just poking around with getting it as fast as I can.
If you have all this, you should see something like:
Now if you wanted, you can manually edit the like statement, re-save, and the chart will be updated for whatever term you put in there. While this does work, it'd be better if we made that a part of the GUI! Let's start working on that and other things next!