Skip to content

Commit

Permalink
Added sampling function and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jlaura committed Jul 2, 2014
1 parent b2d51db commit 910567c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 32 deletions.
37 changes: 25 additions & 12 deletions fisher_jenks/fj_refactored.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import numpy
import numpy as np
import time
import multiprocessing
import ctypes
import warnings

#Suppress the divide by zero errors
warnings.filterwarnings('ignore', category=RuntimeWarning)
numpy.set_printoptions(linewidth = 200, suppress = True)
np.set_printoptions(linewidth = 200, suppress = True)

def fj_generate_sample(y, pct=0.10, random=True):
n = y.size
if random:
choicevector = np.arange(n)
ids = np.random.choice(choicevector,n * pct,replace=False)
#ids = np.random.random_integers(0, n - 1, n * pct)
else:
ids = np.arange(int(n*pct))
yr = y[ids]
yr[-1] = max(y) # make sure we have the upper bound
yr[0] = min(y) # make sure we have the min
return yr

def fisher_jenks(values, classes=5, cores=None, sort=True):
'''Fisher-Jenks Optimal Partitioning of an ordered array into k classes
Expand Down Expand Up @@ -46,25 +59,25 @@ class and the last value the start of the last class.
def allocate(values, classes):
'''This function allocates memory for the variance matrix, error matrix,
and pivot matrix. It also moves the variance matrix and error matrix from
numpy types to a ctypes, shared memory array.'''
np types to a ctypes, shared memory array.'''

numClass = classes
numVal = len(values)

varCtypes = multiprocessing.RawArray(ctypes.c_double, numVal*numVal)
varMat = numpy.frombuffer(varCtypes)
varMat = np.frombuffer(varCtypes)
varMat.shape = (numVal,numVal)

for x in range(0,len(values)):
varMat[x] = values[:]
varMat[x][0:x] = 0
print varMat
errCtypes = multiprocessing.RawArray(ctypes.c_double, classes*numVal)
errorMat = numpy.frombuffer(errCtypes)
errorMat = np.frombuffer(errCtypes)
errorMat.shape = (classes, numVal)

pivotShape = (classes, numVal)
pivotMat = numpy.ndarray(pivotShape, dtype=numpy.float)
pivotMat = np.ndarray(pivotShape, dtype=np.float)

#Initialize the arrays as globals.
initArr(varMat, errorMat)
Expand All @@ -88,8 +101,8 @@ def fj(sharedVar,i, values, start):
'''This function facilitates passing multiple rows to each process and
then performing multiple vector calculations along individual rows.'''
arr = sharedVar
arr[i] = numpy.apply_along_axis(calcVar, 1, arr[i], len(values))
arr[i][numpy.isnan(arr[i])] = 0
arr[i] = np.apply_along_axis(calcVar, 1, arr[i], len(values))
arr[i][np.isnan(arr[i])] = 0

def calcVar(arrRow, lenValues):
'''This function calculates the diameter matrix. It is called by fj.
Expand All @@ -98,15 +111,15 @@ def calcVar(arrRow, lenValues):
of elements summed for each index.'''

lenN = (arrRow != 0).sum()
n = numpy.arange(1, lenN+1)
n = np.arange(1, lenN+1)

if lenN != lenValues:
n.resize(arrRow.shape[0])
n[arrRow.shape[0]-lenN:] = n[:lenN-arrRow.shape[0]]
n[0:arrRow.shape[0]-lenN] = 0
print arrRow
return ((numpy.cumsum(numpy.square(arrRow))) - \
((numpy.cumsum(arrRow)*numpy.cumsum(arrRow)) / (n)))
return ((np.cumsum(np.square(arrRow))) - \
((np.cumsum(arrRow)*np.cumsum(arrRow)) / (n)))

def err(row,y,step, lenrow):
'''This function computes the error on a segment of each error row, from the error matrix.
Expand All @@ -119,7 +132,7 @@ def err(row,y,step, lenrow):
stop = lenrow-1
while y <= stop:
print sharedVar[:,y+row][row:y+row+1]
sharedErrRow[y] = numpy.amin(sharedErr[row-1][row-1:y+row] + sharedVar[:,y+row][row:y+row+1])
sharedErrRow[y] = np.amin(sharedErr[row-1][row-1:y+row] + sharedVar[:,y+row][row:y+row+1])
y+=1

if sort:
Expand Down
98 changes: 78 additions & 20 deletions fisher_jenks/test_fj.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,82 @@
import math
import sys
import time
import numpy
from fj_refactored import fisher_jenks

cores = [1,2,4,16,32]
classes = [5,6,7]
data_sizes = [500, 1000, 2500, 5000, 7500, 10000, 12500, 15000, 17500, 20000, 22500, 25000]

for c in cores:
for d in data_sizes:
for k in classes:
data = numpy.random.ranf(size=d)
try:
import numpy as np
from fj_refactored import fisher_jenks, fj_generate_sample


def testfull():
"""
Tests the fully enumerated Fisher-Jenks implementation
"""
cores = [1,2,4,16,32]
classes = [5,6,7]
data_sizes = [500, 1000, 2500, 5000, 7500, 10000, 12500, 15000, 17500, 20000, 22500, 25000]

for c in cores:
for d in data_sizes:
for k in classes:
data = np.random.ranf(size=d)
try:
t1 = time.time()
#wrapped in try since we will blow out RAM at some point
classification = fisher_jenks(data, k, c)
t2 = time.time()
print "Processed {0} data points in {1} classes using {2} cores. Total time: {3}".format(d, k, c, t2-t1)
data = None
except KeyboardInterrupt:
print "Aborting"
sys.exit(1)
except:
print "FAILURE: {0} data points.".format(d)


def testsample():
"""
Tests the sampled Fisher-Jenks implementation
"""
cores = [1,2,4,16,32]
classes = [5,6,7]
data_sizes = [10000, 20000, 40000, 80000, 160000, 320000, 640000, 1280000,
2560000, 5120000, 10240000, 20480000, 40960000, 81920000,
163840000, 327680000, 655360000]
for c in cores:
for d in data_sizes:
for k in classes:
#Generate the test data and save to disk
data = np.random.ranf(size=d)
nobs = len(data)
np.save('testfile.npy', data)
data = None

#Compute the sample size as the sqrt of nobs
sqrt = math.sqrt(nobs)
if sqrt > 40000:
sqrt = 40000
pct = sqrt / float(d)

#Load the data back into memory as a mmapped file
f = np.load('testfile.npy', mmap_mode='r+')
t1 = time.time()
#wrapped in try since we will blow out RAM at some point
classification = fisher_jenks(data, k, c)
data = fj_generate_sample(f, pct=pct)
t2 = time.time()
print "Processed {0} data points in {1} classes using {2} cores. Total time: {3}".format(d, k, c, t2-t1)
except KeyboardInterrupt:
print "Aborting"
sys.exit(1)
except:
print "FAILURE: {0} data points.".format(d)
print "Randomly sampling {0} percent of {1} observations for a run size of {2} observations took {3} seconds.".format(pct, nobs, sqrt, t2 - t1)
try:
t1 = time.time()
#wrapped in try since we will blow out RAM at some point
classification = fisher_jenks(data, k, c)
t2 = time.time()
print "Processed {0} data points in {1} classes using {2} cores. Total time: {3}".format(d, k, c, t2-t1)
except KeyboardInterrupt:
print "Aborting"
sys.exit(1)
except:
print "FAILURE: {0} data points.".format(d)


if __name__ =='__main__':
#Test the fully enumerated FJ
testfull()

#Test FJ using sampling
testsample()

0 comments on commit 910567c

Please sign in to comment.