Use minute bar machi.na challenge csv data in zipline with a custom data bundle

Quantopian’s zipline - a Pythonic Algorithmic Trading Library - is capable of running trading algorithm simulations with 1 minute open-high-low-close-volume (OHLCV). 

Here is an example of how to create a zipline custom data bundle from a local csv file that contains one minute bar data.  It builds on an earlier blog post for how to create custom zipline data bundles.  Then, we will write a simple zipline algo to test it.

The machi.na challenge is a machine learning exercise to predict the value of SPY based on trading signals.  If you enter, you can download a csv file that contains OHLCV data for SPY along with a number of training signals to be used in the challenge.

The file has almost 200 columns; the first few lines and rows look like this:



-> head -3 data_2010.csv | awk -F"," '{print $1,$2,$3,$4,$5,$6,$7}'
Date SpyOpen SpyHigh SpyLow SpyClose SpyVolume ts_01
20100104T0931 112.38 112.48 112.34 112.42 2314.00 0.000000
20100104T0932 112.41 112.49 112.41 112.45 1282412.00 0.000000

 

If you are not familiar with how zipline custom data bundles are created from local csv files, have a look here first.

Here is our custom ingest function :

 


#
# Ingest stock csv files to create a zipline data bundle
#

import os

import numpy  as np
import pandas as pd
import datetime
from pytz import timezone

boDebug=True # Set True to get trace messages

from zipline.utils.cli import maybe_show_progress

def machina(symbols,start=None,end=None):

    # strict this in memory so that we can reiterate over it.
    # (Because it could be a generator and they live only once)
    tuSymbols = tuple(symbols)

    if boDebug:
        print "entering machina.  tuSymbols=",tuSymbols

    # Define our custom ingest function
    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,
               daily_bar_writer,
               adjustment_writer,
               calendar,
               cache,
               show_progress,
               output_dir,
               # pass these as defaults to make them 'nonlocal' in py2
               start=start,
               end=end):

        if boDebug:
            print "entering ingest and creating blank dfMetadata"

        dfMetadata = pd.DataFrame(np.empty(len(tuSymbols), dtype=[
            ('start_date', 'datetime64[ns]'),
            ('end_date', 'datetime64[ns]'),
            ('auto_close_date', 'datetime64[ns]'),
            ('symbol', 'object'),
        ]))

        if boDebug:
            print "dfMetadata",type(dfMetadata)
            print dfMetadata.describe
            print

        # We need to feed something that is iterable - like a list or a generator -
        # that is a tuple with an integer for sid and a DataFrame for the data to
        # daily_bar_writer

        liData=[]
        iSid=0
        for S in tuSymbols:
            # IFIL="~/machina_contest/machina_mini.csv"
            IFIL="~/machina_contest/data_2010.csv"
            if boDebug:
               print "S=",S,"IFIL=",IFIL
            dfData=pd.read_csv(IFIL,index_col='Date',parse_dates=True).sort_index()
            # csv time stamp is in EST, but pandas doesnt know that yet so tell it
            dfData.index=dfData.index.tz_localize('US/Eastern')
            # zipline needs data in UTC format so lets convert it
            dfData.index=dfData.index.tz_convert('UTC')
            # But, zipline ingest function wants data in Naive date format
            # so remove the tzinfo.
            dfData.index=dfData.index.tz_convert(None)
            if boDebug:
               print "read_csv dfData",type(dfData),"length",len(dfData)
               dfData.index[0]
               print
            dfData.rename(
                columns={
                    'SpyOpen': 'open',
                    'SpyHigh': 'high',
                    'SpyLow': 'low',
                    'SpyClose': 'close',
                    'SpyVolume': 'volume',
                },
                inplace=True,
            )
            liData.append((iSid,dfData))

            # the start date is the date of the first trade and
            start_date = dfData.index[0]
            if boDebug:
                print "start_date",type(start_date),start_date,start_date.tzinfo

            # the end date is the date of the last trade
            end_date = dfData.index[-1]
            if boDebug:
                print "end_date",type(end_date),end_date,end_date.tzinfo

            # The auto_close date is the day after the last trade.
            ac_date = end_date + pd.Timedelta(days=1)
            if boDebug:
                print "ac_date",type(ac_date),ac_date,end_date.tzinfo

            # Update our meta data
            dfMetadata.iloc[iSid] = start_date, end_date, ac_date, S

            iSid += 1

        if boDebug:
            print "liData",type(liData),"length",len(liData)
            print "Now calling minute_bar_writer"

        # daily_bar_writer.write(liData, show_progress=False)
        minute_bar_writer.write(liData, show_progress=False)

        # Hardcode the exchange to "YAHOO" for all assets and (elsewhere)
        # register "YAHOO" to resolve to the NYSE calendar, because these are
        # all equities and thus can use the NYSE calendar.
        dfMetadata['exchange'] = "YAHOO"

        if boDebug:
            print "returned from minute_bar_writer"
            print "calling asset_db_writer"
            print "dfMetadata",type(dfMetadata)
            print dfMetadata
            print

        # Not sure why symbol_map is needed
        symbol_map = pd.Series(dfMetadata.symbol.index, dfMetadata.symbol)
        if boDebug:
            print "symbol_map",type(symbol_map)
            print symbol_map
            print

        asset_db_writer.write(equities=dfMetadata)

        if boDebug:
            print "returned from asset_db_writer"
            print "calling adjustment_writer"

        adjustment_writer.write()

        if boDebug:
            print "returned from adjustment_writer"
            print "now leaving ingest function"


    if boDebug:
       print "about to return ingest function"
    return ingest

 

Lets ingest it to zipline



-> zipline ingest -b machina
entering machina.  tuSymbols= ('SPY',)
about to return ingest function
entering ingest and creating blank dfMetadata
dfMetadata <class 'pandas.core.frame.DataFrame'>
<bound method DataFrame.describe of   start_date   end_date auto_close_date symbol
0 1970-01-01 1970-01-01      1970-01-01   None>

S= SPY IFIL= ~/machina_contest/data_2010.csv
read_csv dfData <class 'pandas.core.frame.DataFrame'> length 97320

start_date <class 'pandas.tslib.Timestamp'> 2010-01-04 14:31:00 None
end_date <class 'pandas.tslib.Timestamp'> 2010-12-31 21:00:00 None
ac_date <class 'pandas.tslib.Timestamp'> 2011-01-01 21:00:00 None
liData <type 'list'> length 1
Now calling minute_bar_writer
returned from minute_bar_writer
calling asset_db_writer
dfMetadata <class 'pandas.core.frame.DataFrame'>
           start_date            end_date     auto_close_date symbol exchange
0 2010-01-04 14:31:00 2010-12-31 21:00:00 2011-01-01 21:00:00    SPY    YAHOO

symbol_map <class 'pandas.core.series.Series'>
symbol
SPY    0
dtype: int64


returned from asset_db_writer
calling adjustment_writer
returned from adjustment_writer
now leaving ingest function

 

And confirm that zipline can find it:



-> zipline bundles
entering machina.  tuSymbols= ('SPY',)
about to return ingest function
machina 2016-10-04 13:35:21.953984
quandl <no ingestions>
quantopian-quandl 2016-09-09 13:16:35.114238

 

Now lets write a simple zipline algo to see if we can read it:



from zipline.api import symbol,get_datetime
from pytz import timezone
def initialize(context):
    context.eqSPY=symbol("SPY")
    context.stDate=""
def handle_data(context,data):
    dtNow=get_datetime(timezone('US/Eastern')).time().strftime("%H:%M")
    ddNow=str(get_datetime(timezone('US/Eastern')).date())
    if context.stDate <> ddNow :
       fOpen=data.current(context.eqSPY,'open')
       print ddNow,dtNow,fOpen
       context.stDate=ddNow

Now run zipline in one minute bar data :



-> zipline run -f test_machina.py --bundle machina --data-frequency minute -s 2010-1-4 -e 2010-1-7
entering machina.  tuSymbols= ('SPY',)
about to return ingest function
2010-01-04 09:31 112.38
2010-01-05 09:31 113.27
2010-01-06 09:31 113.51
2010-01-07 09:31 113.49
[2016-10-04 13:47:14.015330] INFO: Performance: Simulated 4 trading days out of 4.
[2016-10-04 13:47:14.015489] INFO: Performance: first open: 2010-01-04 14:31:00+00:00
[2016-10-04 13:47:14.015580] INFO: Performance: last close: 2010-01-07 21:00:00+00:00

 

Success!  We have created a custom data bundle with one minute bar data and used this in a simple zipline algo!