In this Quantopian tutorial, we're going to be covering the Pipeline API. If you recall leading up to this, we were often limited by what we wanted to do, usually be a 500 maximum on our stock universe. The Pipeline API allows you to select from more like 8000+ securities at a time, which opens the door to many new opportunities.
The Pipeline API can reference 8000+ securities at a time, but you can still only reference 500 in realtime. Thus, you will most likely use the Pipeline API in conjunction with update_universe
.
In order to take 8000 securities and get to a number small enough to work with your universe, you will be using Factors and Filters, many times together. A factor is used to return numbers, and a filter returns a boolean. For example, you may use a factor first to apply a 50 and a 200 moving average to the companies. Then you may employ a filter to return all companies that have a 50 moving average above the 200 moving average.
While Quantopian provides many built in factors, you can also Create custom factors.
There's a general idea of the pipline, but let's actually employ the Pipeline API in an example to get a better idea. Let's make a trading algorithm that wouldn't have been previously possible without the Pipeline API. Our algorithm will take two simple moving averages, a 50 and 200. From here, we're going to look for companies that have the strongest ratio of 50MA/200MA ratios. A 50MA that is higher than the 200MA means the price is at least rising, but we want to find companies that are showing the strongest rises, so this will be an example of a momentum strategy. In the past, we would have first needed to filter the 8000 companies in some way before even beginning to look at transformations of price or other data, most likely by price. Now, we can start by looking at over 8000 companies, and then fitler down. To start, we need a few imports:
from quantopian.pipeline import Pipeline from quantopian.algorithm import attach_pipeline, pipeline_output from quantopian.pipeline.data.builtin import USEquityPricing from quantopian.pipeline.factors import SimpleMovingAverage
Above, we import the Pipeline itself, then we import the ability to attach pipes and output the pipeline. attach_pipeline
is called in the initialize
function, which notifies Quantopian that we intend to use it. pipeline_output
is used to return a dataframe of the data, where the index is security. Thus, this would be something you would see every in a daily trading algorithm, probably referenced in a before_trading_start
function.
Next, we import USEquityPricing
from the built-in data for the pipeline. The Quantopian API identifies these built-in datasets as a type of DataSet
. You may be confused by this. A DataSet
actually has no data, which is fairly odd out of the gate, but we'll reconcile with that soon enough. Finally, we're bringing in the SimpleMovingAverage
factor, which is one of the pre-made filters at our disposal.
Next, all of our algorithms start with the initialize
function when running, so we'll start with that. As mentioned already, this is where we must include that we're going to be using the Pipeline.
def initialize(context): pipe = Pipeline() attach_pipeline(pipe, 'pipeline_tutorial')
We begin with defining our pipe, and then by attaching a pipeline. Now let's create some factors (still in our initialize
function):
_50ma = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50) _200ma = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=200)
Now we have created these factors, let's add them to our Pipeline:
pipe.add(_50ma, '_50ma') pipe.add(_200ma, '_200ma')
Simple as that. Now, we mentioned we wanted to perform a ratio. Doing that is actually super simple:
pipe.add(_50ma/_200ma, 'ma_ratio')
While we don't have to, we can also filter the data. In our case, we're only looking for companies to invest in. Eventually, we need to be shorting companies to improve beta. Let's assume, however, we're only looking for companies that are rising. We can apply a filter like so:
pipe.set_screen(_50ma/_200ma > 1.0)
In that case, we filtered by a division of factors, but you can of course filter by a single factor. At this point, our initialize
is complete and we're ready to dive into our before_trading_start
function.
def before_trading_start(context, data): output = pipeline_output('pipeline_tutorial') context.my_universe = output.sort('ma_ratio', ascending=False).iloc[:300] update_universe(context.my_universe.index)
As a reminder, context
is what you use to reference your specific algorithm and performance. data
is what is used to reference things outside of your portfolio.
The output
variable is what is going to house our pipeline data, which we called "pipeline_tutorial." Our actual pipe is a pipeline object, but the pipeline_output
method allows us to get a dataframe back, allowing us to do all the fun stuff that we can do with Pandas Dataframes. For now, we just take the top performing ratios, by sorting the dataframe by the ma_ratio column, in descending order, and then we take the first 300 rows. Finally, we update our universe with this information. Our universe is what has a maximum of 500 companies, but, up to this point, we've been referencing the entire list of companies, and able to use those pre-made factors, or create our own.
The final function we need is the handle_data
function. Let's start building that:
def handle_data(context, data): log.info("\n" + str(context.my_universe.head())) log.info("\n" + str(len(context.my_universe)))
For now, we're just outputting some data, just so we can see what we're working with. At this point, the full code is:
from quantopian.pipeline import Pipeline from quantopian.algorithm import attach_pipeline, pipeline_output from quantopian.pipeline.data.builtin import USEquityPricing from quantopian.pipeline.factors import SimpleMovingAverage def initialize(context): pipe = Pipeline() attach_pipeline(pipe, 'pipeline_tutorial') _50ma = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=50) _200ma = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=200) pipe.add(_50ma, '_50ma') pipe.add(_200ma, '_200ma') pipe.add(_50ma/_200ma, 'ma_ratio') pipe.set_screen(_50ma/_200ma > 1.0) def before_trading_start(context, data): output = pipeline_output('pipeline_tutorial') context.my_universe = output.sort('ma_ratio', ascending=False).iloc[:100] update_universe(context.my_universe.index) def handle_data(context, data): log.info("\n" + str(context.my_universe.head())) log.info("\n" + str(len(context.my_universe)))
In the next tutorial, we'll incorporate trading based on this strategy.