Skip to content

Commit

Permalink
Merge pull request #5 from afiedler/handle-large-partitions
Browse files Browse the repository at this point in the history
Handle large partitions
  • Loading branch information
afiedler committed Oct 10, 2015
2 parents f6f1fcf + 985ab2b commit fd4f719
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 8 deletions.
109 changes: 109 additions & 0 deletions src/tstables/tests/test_tstable_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
from cStringIO import StringIO
import os
import pandas
import mock
import numpy

# Class to define record structure
class Price(tables.IsDescription):
timestamp = tables.Int64Col(pos=0)
price = tables.Int32Col(pos=1)


class TsTableFileTestCase(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -104,6 +107,112 @@ def test_load_same_timestamp(self):
for idx,p in enumerate(rows_read['price']):
self.assertEqual(p,rows['price'][idx])


@mock.patch.object(tstables.TsTable, 'MAX_FULL_PARTITION_READ_SIZE', 1)
def test_load_same_timestamp(self):

# Test data that is multiple rows with the same timestamp
csv = u"""2014-05-05T01:01:01.100Z,1
2014-05-05T01:01:01.100Z,2
2014-05-05T01:01:01.100Z,3
2014-05-05T01:01:01.100Z,4
2014-05-05T01:01:01.100Z,5"""

sfile = StringIO(csv)

# Note: don't need the 'timestamp' column in the dtype param here because it will become the DatetimeIndex.
rows = pandas.read_csv(sfile,parse_dates=[0],index_col=0,names=['timestamp', 'price'],dtype={'price': 'i4'})

ts = self.h5_file.create_ts('/','EURUSD',description=Price)
ts.append(rows)

# Inspect to ensure that data has been stored correctly
tbl = ts.root_group.y2014.m05.d05.ts_data

self.assertEqual(tbl.nrows,5)

# Fetch rows over a larger range
rows_read = ts.read_range(datetime.datetime(2014,5,5,tzinfo=pytz.utc),datetime.datetime(2014,5,6,tzinfo=pytz.utc))

# Confirm equality
for idx,p in enumerate(rows_read['price']):
self.assertEqual(p,rows['price'][idx])

# Fetch rows over the smallest possible range
rows_read = ts.read_range(datetime.datetime(2014,5,5,1,1,1,100*1000,tzinfo=pytz.utc),
datetime.datetime(2014,5,5,1,1,1,100*1000,tzinfo=pytz.utc))

# Confirm equality
for idx,p in enumerate(rows_read['price']):
self.assertEqual(p,rows['price'][idx])


@mock.patch.object(tstables.TsTable, 'MAX_FULL_PARTITION_READ_SIZE', 1)
@mock.patch.object(tables.Table, 'read_where')
@mock.patch.object(tables.Table, 'read')
def test_read_using_read_where(self, mock_read, mock_read_where):

csv = u"""2014-05-05T01:01:01.100Z,1
2014-05-05T01:01:01.100Z,2
2014-05-05T01:01:01.100Z,3
2014-05-05T01:01:01.100Z,4
2014-05-05T01:01:01.100Z,5"""

sfile = StringIO(csv)

# Note: don't need the 'timestamp' column in the dtype param here because it will become the DatetimeIndex.
rows = pandas.read_csv(sfile,parse_dates=[0],index_col=0,names=['timestamp', 'price'],dtype={'price': 'i4'})

ts = self.h5_file.create_ts('/','EURUSD',description=Price)
ts.append(rows)

# Inspect to ensure that data has been stored correctly
tbl = ts.root_group.y2014.m05.d05.ts_data

self.assertEqual(tbl.nrows,5)

# Table.read_where is a mock, so we need to give it a return value
mock_read_where.return_value = numpy.ndarray(shape=0,dtype=[('timestamp', '<i8'), ('price', '<i4')])

# Fetch rows over a larger range
rows_read = ts.read_range(datetime.datetime(2014,5,5,tzinfo=pytz.utc),datetime.datetime(2014,5,6,tzinfo=pytz.utc))

self.assertEquals(mock_read_where.called, True)
self.assertEquals(mock_read.called, False)

@mock.patch.object(tables.Table, 'read_where')
@mock.patch.object(tables.Table, 'read')
def test_read_using_read_where(self, mock_read, mock_read_where):

csv = u"""2014-05-05T01:01:01.100Z,1
2014-05-05T01:01:01.100Z,2
2014-05-05T01:01:01.100Z,3
2014-05-05T01:01:01.100Z,4
2014-05-05T01:01:01.100Z,5"""

sfile = StringIO(csv)

# Note: don't need the 'timestamp' column in the dtype param here because it will become the DatetimeIndex.
rows = pandas.read_csv(sfile,parse_dates=[0],index_col=0,names=['timestamp', 'price'],dtype={'price': 'i4'})

ts = self.h5_file.create_ts('/','EURUSD',description=Price)
ts.append(rows)

# Inspect to ensure that data has been stored correctly
tbl = ts.root_group.y2014.m05.d05.ts_data

self.assertEqual(tbl.nrows,5)

# Table.read_where is a mock, so we need to give it a return value
mock_read.return_value = numpy.ndarray(shape=0,dtype=[('timestamp', '<i8'), ('price', '<i4')])

# Fetch rows over a larger range
rows_read = ts.read_range(datetime.datetime(2014,5,5,tzinfo=pytz.utc),datetime.datetime(2014,5,6,tzinfo=pytz.utc))

self.assertEquals(mock_read_where.called, False)
self.assertEquals(mock_read.called, True)


def __load_csv_data(self,csv):
sfile = StringIO(csv)

Expand Down
24 changes: 16 additions & 8 deletions src/tstables/tstable.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class TsTable:
# Partition size is one day (in milliseconds)
PARTITION_SIZE = numpy.int64(86400000)

# The maximum partition size to read completely into memory before using Table.read_where.
MAX_FULL_PARTITION_READ_SIZE = 25*1e6

def __init__(self,pt_file,root_group,description,title="",filters=None,
expectedrows_per_partition=10000,chunkshape=None,byteorder=None):
self.file = pt_file
Expand Down Expand Up @@ -98,14 +101,19 @@ def __fetch_rows_from_partition(self,partition_date,start_dt,end_dt):
# If the partition group is missing, then return an empty array
return numpy.ndarray(shape=0,dtype=self.__v_dtype())

# It is faster to fetch the entire partition into memory and process it with NumPy than to use Table.read_where
p_data = d_group.ts_data.read()
start_ts = self.__dt_to_ts(start_dt)
end_ts = self.__dt_to_ts(end_dt)
start_idx = numpy.searchsorted(p_data['timestamp'], start_ts, side='left')
end_idx = numpy.searchsorted(p_data['timestamp'], end_ts, side='right')

return p_data[start_idx:end_idx]
# It is faster to fetch the entire partition into memory and process it with NumPy than to
# use Table.read_where. However, Table.read_where might be needed for very large partitions
# where memory usage is a concern.
if d_group.ts_data.rowsize * d_group.ts_data.nrows < TsTable.MAX_FULL_PARTITION_READ_SIZE:
p_data = d_group.ts_data.read()
start_ts = self.__dt_to_ts(start_dt)
end_ts = self.__dt_to_ts(end_dt)
start_idx = numpy.searchsorted(p_data['timestamp'], start_ts, side='left')
end_idx = numpy.searchsorted(p_data['timestamp'], end_ts, side='right')
return p_data[start_idx:end_idx]
else:
return d_group.ts_data.read_where('(timestamp >= {0}) & (timestamp <= {1})'.format(
self.__dt_to_ts(start_dt),self.__dt_to_ts(end_dt)))

def __fetch_first_table(self):
y_group = self.root_group._f_list_nodes()[0]
Expand Down

0 comments on commit fd4f719

Please sign in to comment.