Create custom zipline data bundle from local csv files

Quantopian's open source zipline - a Pythonic Algorithmic Trading Library - now uses an internal format to store open-high-low-close-volume (OHLCV) equity data called a data bundle.  Some examples of how to create bundles are provided in the data/bundles folder but they contain a lot of extra functionality for pulling data from web sources like yahoo.

Here is a basic example of creating a custom data bundle from local csv files.

The steps to do this are:

  1. Obtain some sample CSV files from yahoo for symbols not in default bundle
  2. Create a custom bundle support module (in our case, called "viacsv")
  3. Advise zipline of our bundle by registering it via .zipline/extension.py
  4. Create the bundle
  5. Test our bundle with zipline

 

IMPORTANT NOTE : This example is based on the premise that your CSV files use NYSE trade dates (also known as the "YAHOO" calendar).  If your CSV files are for an exchange that trades on different days, you must register the bundle with that exchanges zipline calendar.  Look in zipline directory zipline/utils/calendars for other supported calendars. 

For this example. we will use some CSV files for symbols traded on the NYSE exchange.  The first few lines of each csv file look like this:



-> head -3 N225.csv
Date,Open,High,Low,Close,Volume,Adj Close
2015-10-01,17479.970703,17831.580078,17389.570312,17722.419922,156400,17722.419922
2015-09-30,17193.839844,17460.970703,17179.400391,17388.150391,151800,17388.150391

I use virtualenv, so I created a new file in folder zipline/data/bundles/viacsv.py with this content:



#
# Ingest stock csv files to create a zipline data bundle

import os

import numpy  as np
import pandas as pd
import datetime

boDebug=True # Set True to get trace messages

from zipline.utils.cli import maybe_show_progress

def viacsv(symbols,start=None,end=None):

    # strict this in memory so that we can reiterate over it.
    # (Because it could be a generator and they live only once)
    tuSymbols = tuple(symbols)

    if boDebug:
        print "entering viacsv.  tuSymbols=",tuSymbols

    # Define our custom ingest function
    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,  # unused
               daily_bar_writer,
               adjustment_writer,
               calendar,
               cache,
               show_progress,
               output_dir,
               # pass these as defaults to make them 'nonlocal' in py2
               start=start,
               end=end):

        if boDebug:
            print "entering ingest and creating blank dfMetadata"

        dfMetadata = pd.DataFrame(np.empty(len(tuSymbols), dtype=[
            ('start_date', 'datetime64[ns]'),
            ('end_date', 'datetime64[ns]'),
            ('auto_close_date', 'datetime64[ns]'),
            ('symbol', 'object'),
        ]))

        if boDebug:
            print "dfMetadata",type(dfMetadata)
            print dfMetadata.describe
            print

        # We need to feed something that is iterable - like a list or a generator -
        # that is a tuple with an integer for sid and a DataFrame for the data to
        # daily_bar_writer

        liData=[]
        iSid=0
        for S in tuSymbols:
            IFIL="~/notebooks/csv/"+S+".csv"
            if boDebug:
               print "S=",S,"IFIL=",IFIL
            dfData=pd.read_csv(IFIL,index_col='Date',parse_dates=True).sort_index()
            if boDebug:
               print "read_csv dfData",type(dfData),"length",len(dfData)
               print
            dfData.rename(
                columns={
                    'Open': 'open',
                    'High': 'high',
                    'Low': 'low',
                    'Close': 'close',
                    'Volume': 'volume',
                    'Adj Close': 'price',
                },
                inplace=True,
            )
            dfData['volume']=dfData['volume']/1000
            liData.append((iSid,dfData))

            # the start date is the date of the first trade and
            start_date = dfData.index[0]
            if boDebug:
                print "start_date",type(start_date),start_date

            # the end date is the date of the last trade
            end_date = dfData.index[-1]
            if boDebug:
                print "end_date",type(end_date),end_date

            # The auto_close date is the day after the last trade.
            ac_date = end_date + pd.Timedelta(days=1)
            if boDebug:
                print "ac_date",type(ac_date),ac_date

            # Update our meta data
            dfMetadata.iloc[iSid] = start_date, end_date, ac_date, S

            iSid += 1

        if boDebug:
            print "liData",type(liData),"length",len(liData)
            print liData
            print
            print "Now calling daily_bar_writer"

        daily_bar_writer.write(liData, show_progress=False)

        # Hardcode the exchange to "YAHOO" for all assets and (elsewhere)
        # register "YAHOO" to resolve to the NYSE calendar, because the csv files
        # are for equities that traded per the NYSE calendar.
        dfMetadata['exchange'] = "YAHOO"

        if boDebug:
            print "returned from daily_bar_writer"
            print "calling asset_db_writer"
            print "dfMetadata",type(dfMetadata)
            print dfMetadata
            print

        # Not sure why symbol_map is needed
        symbol_map = pd.Series(dfMetadata.symbol.index, dfMetadata.symbol)
        if boDebug:
            print "symbol_map",type(symbol_map)
            print symbol_map
            print

        asset_db_writer.write(equities=dfMetadata)

        if boDebug:
            print "returned from asset_db_writer"
            print "calling adjustment_writer"

        adjustment_writer.write()

        if boDebug:
            print "returned from adjustment_writer"
            print "now leaving ingest function"

    if boDebug:
       print "about to return ingest function"
    return ingest

 

The above function has LOTS of debugging print statement output so that you can follow along with the intermediate steps and learn more about what this module is doing and when zipline calls it.  For regular use, you will want to change line 14 to "boDebug=False" for less chatty  operation.

The premise of the module is to create a pandas DataFrame with the date as the index, and 5 columns with names exactly as zipline needs.  A tuple is formed with a stock identifier (sid) and the DataFrame.  This tuple is then fed to the zipline provided function daily_bar_writer.  A second pandas DataFrame is also created with the meta-data consisting mainly of the start and end dates of the data along with the stock symbol to be associated with the sid.

Next, we have to advise zipline by adding a few lines to .zipline/extension.py :



from zipline.data.bundles import register
from zipline.data.bundles.viacsv import viacsv

eqSym = {
    "AOI",
    "DJI",
    "GDAXI",
    "GSPC",
    "HSI",
    "N225",
    "NYA",
}

register(
    'csv',    # name this whatever you like
    viacsv(eqSym),
)

Now we are ready to create our bundle:



->  zipline ingest -b csv
entering viacsv.  tuSymbols= ('GSPC', 'GDAXI', 'HSI', 'AOI', 'DJI', 'NYA', 'N225')
about to return ingest function
entering ingest and creating blank dfMetadata
dfMetadata <class 'pandas.core.frame.DataFrame'>
<bound method DataFrame.describe of   start_date   end_date auto_close_date symbol
0 1970-01-01 1970-01-01      1970-01-01   None
1 1970-01-01 1970-01-01      1970-01-01   None
2 1970-01-01 1970-01-01      1970-01-01   None
3 1970-01-01 1970-01-01      1970-01-01   None
4 1970-01-01 1970-01-01      1970-01-01   None
5 1970-01-01 1970-01-01      1970-01-01   None
6 1970-01-01 1970-01-01      1970-01-01   None>

S= GSPC IFIL= ~/notebooks/csv/GSPC.csv
read_csv dfData <class 'pandas.core.frame.DataFrame'> length 1447

start_date <class 'pandas.tslib.Timestamp'> 2010-01-04 00:00:00
end_date <class 'pandas.tslib.Timestamp'> 2015-10-01 00:00:00
ac_date <class 'pandas.tslib.Timestamp'> 2015-10-02 00:00:00
S= GDAXI IFIL= ~/notebooks/csv/GDAXI.csv
read_csv dfData <class 'pandas.core.frame.DataFrame'> length 1470

start_date <class 'pandas.tslib.Timestamp'> 2010-01-04 00:00:00
end_date <class 'pandas.tslib.Timestamp'> 2015-10-01 00:00:00
ac_date <class 'pandas.tslib.Timestamp'> 2015-10-02 00:00:00
S= HSI IFIL= ~/notebooks/csv/HSI.csv
read_csv dfData <class 'pandas.core.frame.DataFrame'> length 1441

<.... skipping ... lots of lines of debugging output....>

Now calling daily_bar_writer
returned from daily_bar_writer
calling asset_db_writer
dfMetadata <class 'pandas.core.frame.DataFrame'>
  start_date   end_date auto_close_date symbol exchange
0 2010-01-04 2015-10-01      2015-10-02   GSPC    YAHOO
1 2010-01-04 2015-10-01      2015-10-02  GDAXI    YAHOO
2 2010-01-04 2015-09-30      2015-10-01    HSI    YAHOO
3 2010-01-04 2015-10-01      2015-10-02    AOI    YAHOO
4 2010-01-04 2015-10-01      2015-10-02    DJI    YAHOO
5 2010-01-04 2015-10-01      2015-10-02    NYA    YAHOO
6 2010-01-04 2015-10-01      2015-10-02   N225    YAHOO

symbol_map <class 'pandas.core.series.Series'>
symbol
GSPC     0
GDAXI    1
HSI      2
AOI      3
DJI      4
NYA      5
N225     6
dtype: int64


returned from asset_db_writer
calling adjustment_writer
returned from adjustment_writer
now leaving ingest function

 

Now, with the debugging print statements still turned on, lets confirm that zipline can find our bundle:



-> zipline bundles
entering viacsv.  tuSymbols= ('GSPC', 'GDAXI', 'HSI', 'AOI', 'DJI', 'NYA', 'N225')
about to return ingest function
csv 2016-10-04 12:52:18.383110
quandl <no ingestions>
quantopian-quandl 2016-09-09 13:16:35.114238

 

Lets write a quick test zipline algo to see if we can use this bundle:



-> cat test_csv.py
from zipline.api import order_target_percent,get_datetime
def initialize(context):
    pass
def handle_data(context,data):
    iNumEqt=len(data)
    print "get_datetime",get_datetime(),"iNumEqt=",iNumEqt
    fW=1.0/iNumEqt
    for S in data:
       print "Ordering",fW," of ",S.symbol
       order_target_percent(S,fW)

Now run it:



-> zipline run -f test_csv.py --bundle csv -s 2010-1-4 -e 2010-1-7
entering viacsv.  tuSymbols= ('GSPC', 'GDAXI', 'HSI', 'AOI', 'DJI', 'NYA', 'N225')
about to return ingest function
entering machina.  tuSymbols= ('MSPY',)
about to return ingest function
get_datetime 2010-01-04 21:00:00+00:00 iNumEqt= 7
Ordering 0.142857142857  of  GSPC
Ordering 0.142857142857  of  GDAXI
Ordering 0.142857142857  of  HSI
Ordering 0.142857142857  of  AOI
Ordering 0.142857142857  of  DJI
Ordering 0.142857142857  of  NYA
Ordering 0.142857142857  of  N225
get_datetime 2010-01-05 21:00:00+00:00 iNumEqt= 7
Ordering 0.142857142857  of  GSPC
Ordering 0.142857142857  of  GDAXI
Ordering 0.142857142857  of  HSI
Ordering 0.142857142857  of  AOI
Ordering 0.142857142857  of  DJI
Ordering 0.142857142857  of  NYA
Ordering 0.142857142857  of  N225
get_datetime 2010-01-06 21:00:00+00:00 iNumEqt= 7
Ordering 0.142857142857  of  GSPC
Ordering 0.142857142857  of  GDAXI
Ordering 0.142857142857  of  HSI
Ordering 0.142857142857  of  AOI
Ordering 0.142857142857  of  DJI
Ordering 0.142857142857  of  NYA
Ordering 0.142857142857  of  N225
get_datetime 2010-01-07 21:00:00+00:00 iNumEqt= 7
Ordering 0.142857142857  of  GSPC
Ordering 0.142857142857  of  GDAXI
Ordering 0.142857142857  of  HSI
Ordering 0.142857142857  of  AOI
Ordering 0.142857142857  of  DJI
Ordering 0.142857142857  of  NYA
Ordering 0.142857142857  of  N225
[2016-10-04 13:08:28.283818] INFO: Performance: Simulated 4 trading days out of 4.
[2016-10-04 13:08:28.283973] INFO: Performance: first open: 2010-01-04 14:31:00+00:00
[2016-10-04 13:08:28.284064] INFO: Performance: last close: 2010-01-07 21:00:00+00:00

Success!  We have created a custom bundle using local CSV files and confirmed our zipline algo can read it.

You may also be interested in creating a data bundle with 1 minute bar data and running a zipline algo .