Streaming Tweets and Sentiment from Twitter in Python - Sentiment Analysis GUI with Dash and Python p.2




Hello and welcome to another tutorial with sentiment analysis, this time we're going to save our tweets, sentiment, and some other features to a database. My plan is to combine this into a Dash application for some data analysis and visualization of Twitter sentiment on varying topics.

In this tutorial, we're basically going to just tie in concepts from the sqlite and twitter streaming api examples.

To make our app, we want to be streaming Tweets live, and then our Dash app can read them. The Twitter streaming API doesn't play well with disconnecting and reconnecting, so we want to just stream everything, and filter it later, as opposed to streaming it real-time, changing the streaming keyword live.

To use Tweepy, do a pip install tweepy. Here's a simple Twitter streaming script that streams tweets containing "car:"

from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener


#consumer key, consumer secret, access token, access secret.
ckey="fsdfasdfsafsffa"
csecret="asdfsadfsadfsadf"
atoken="asdf-aassdfs"
asecret="asdfsadfsdafsdafs"

class listener(StreamListener):

    def on_data(self, data):
        print(data)
        return(True)

    def on_error(self, status):
        print status

auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

twitterStream = Stream(auth, listener())
twitterStream.filter(track=["car"])

You will need to setup your Twitter API on your account.

From here, we want to stream these tweets into a database. We'll use SQLite for that. SQLite comes with Python, so you should already have it. Let's start by creating our database and table. With SQLite, the act of connecting a database is enough to create it if it doesn't exist. We do need to create and specify the structure of any table though. For example, let's do:

import sqlite3

conn = sqlite3.connect('twitter.db')
c = conn.cursor()

def create_table():
    c.execute("CREATE TABLE IF NOT EXISTS sentiment(unix REAL, tweet TEXT, sentiment REAL)")
    conn.commit()

create_table()

I may change this table's structure later, but, for now, I plan to just track time, tweet, and sentiment. We can calculate any further things on the fly. If we find there's some super common operation, like part of speech tagging or something, we can do that as we insert into the database. That said, for now, let's keep the database insertions as simple and as reasonably fast as we can.

Now, we just combine everything: The sentiment, the twitter streaming, and the sqlite:

from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import json
import sqlite3
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from unidecode import unidecode

analyzer = SentimentIntensityAnalyzer()

conn = sqlite3.connect('twitter.db')
c = conn.cursor()

def create_table():
    c.execute("CREATE TABLE IF NOT EXISTS sentiment(unix REAL, tweet TEXT, sentiment REAL)")
    conn.commit()
create_table()

#consumer key, consumer secret, access token, access secret.
ckey=""
csecret=""
atoken=""
asecret=""

class listener(StreamListener):

    def on_data(self, data):
        try:
            data = json.loads(data)
            tweet = unidecode(data['text'])
            time_ms = data['timestamp_ms']
            vs = analyzer.polarity_scores(tweet)
            sentiment = vs['compound']
            print(time_ms, tweet, sentiment)
            c.execute("INSERT INTO sentiment (unix, tweet, sentiment) VALUES (?, ?, ?)",
                  (time_ms, tweet, sentiment))
            conn.commit()

        except KeyError as e:
            print(str(e))
        return(True)

    def on_error(self, status):
        print(status)

auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)

twitterStream = Stream(auth, listener())
twitterStream.filter(track=["a","e","i","o","u"])

I do not know of a better way to "track everything" besides just tracking all of the vowels. I've tried tracking spaces, which has been problematic in the past. To my knowledge as well, you must send in what you want to "filter" by.

Since Dash runs in threads, we are going to run this script separately from our Dash app, and this is all there is to it. We want to just track everything that we can here, and filter later. You might be thinking that we could use the Dash app to filter what we're tracking dynamically. Unforunately, I don't think that's going to be possible, because we do not want to break the connection to Twitter frequently, otherwise our connection will be disallowed for a short time. There may be some way to update the tracking variable live in the stream's connection, but I do not know it.

I've also added some more error handling, I was still having this disconnect from time to time. If that happens, we'll wait a few seconds and try again:

from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import json
import sqlite3
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from unidecode import unidecode
import time

analyzer = SentimentIntensityAnalyzer()

#consumer key, consumer secret, access token, access secret.
ckey=""
csecret=""
atoken=""
asecret=""

conn = sqlite3.connect('twitter.db')
c = conn.cursor()

def create_table():
    try:
        c.execute("CREATE TABLE IF NOT EXISTS sentiment(unix REAL, tweet TEXT, sentiment REAL)")
        c.execute("CREATE INDEX fast_unix ON sentiment(unix)")
        c.execute("CREATE INDEX fast_tweet ON sentiment(tweet)")
        c.execute("CREATE INDEX fast_sentiment ON sentiment(sentiment)")
        conn.commit()
    except Exception as e:
        print(str(e))
create_table()



class listener(StreamListener):

    def on_data(self, data):
        try:
            data = json.loads(data)
            tweet = unidecode(data['text'])
            time_ms = data['timestamp_ms']
            vs = analyzer.polarity_scores(tweet)
            sentiment = vs['compound']
            print(time_ms, tweet, sentiment)
            c.execute("INSERT INTO sentiment (unix, tweet, sentiment) VALUES (?, ?, ?)",
                  (time_ms, tweet, sentiment))
            conn.commit()

        except KeyError as e:
            print(str(e))
        return(True)

    def on_error(self, status):
        print(status)


while True:

    try:
        auth = OAuthHandler(ckey, csecret)
        auth.set_access_token(atoken, asecret)
        twitterStream = Stream(auth, listener())
        twitterStream.filter(track=["a","e","i","o","u"])
    except Exception as e:
        print(str(e))
        time.sleep(5)

Anyways, now that we have this, let's work on reading from the database and a basic manipulation next.

The next tutorial:





  • Intro - Data Visualization Applications with Dash and Python p.1
  • Interactive User Interface - Data Visualization GUIs with Dash and Python p.2
  • Dynamic Graph based on User Input - Data Visualization GUIs with Dash and Python p.3
  • Live Graphs - Data Visualization GUIs with Dash and Python p.4
  • Vehicle Data App Example - Data Visualization GUIs with Dash and Python p.5
  • Out of the Box Sentiment Analysis options with Python using VADER Sentiment and TextBlob
  • Streaming Tweets and Sentiment from Twitter in Python - Sentiment Analysis GUI with Dash and Python p.2
  • Reading from our sentiment database - Sentiment Analysis GUI with Dash and Python p.3
  • Live Twitter Sentiment Graph - Sentiment Analysis GUI with Dash and Python p.4
  • Dynamically Graphing Terms for Sentiment - Sentiment Analysis GUI with Dash and Python p.5
  • Deploy Dash App to a VPS web server - Data Visualization Applications with Dash and Python p.11