diff --git a/samples/python2/digits.py b/samples/python2/digits.py index f4bb0a5cd0..88e9fb8adf 100644 --- a/samples/python2/digits.py +++ b/samples/python2/digits.py @@ -1,78 +1,130 @@ ''' -Neural network digit recognition sample. +SVN and KNearest digit recognition. + +Sample loads a dataset of handwritten digits from 'digits.png'. +Then it trains a SVN and KNearest classifiers on it and evaluates +their accuracy. Moment-based image deskew is used to improve +the recognition accuracy. + Usage: digits.py - - Sample loads a dataset of handwritten digits from 'digits.png'. - Then it trains a neural network classifier on it and evaluates - its classification accuracy. ''' import numpy as np import cv2 -from common import mosaic - -def unroll_responses(responses, class_n): - '''[1, 0, 2, ...] -> [[0, 1, 0], [1, 0, 0], [0, 0, 1], ...]''' - sample_n = len(responses) - new_responses = np.zeros((sample_n, class_n), np.float32) - new_responses[np.arange(sample_n), responses] = 1 - return new_responses - +from multiprocessing.pool import ThreadPool +from common import clock, mosaic SZ = 20 # size of each digit is SZ x SZ CLASS_N = 10 -digits_img = cv2.imread('digits.png', 0) -# prepare dataset -h, w = digits_img.shape -digits = [np.hsplit(row, w/SZ) for row in np.vsplit(digits_img, h/SZ)] -digits = np.float32(digits).reshape(-1, SZ*SZ) -N = len(digits) -labels = np.repeat(np.arange(CLASS_N), N/CLASS_N) +def load_digits(fn): + print 'loading "%s" ...' % fn + digits_img = cv2.imread(fn, 0) + h, w = digits_img.shape + digits = [np.hsplit(row, w/SZ) for row in np.vsplit(digits_img, h/SZ)] + digits = np.array(digits).reshape(-1, SZ, SZ) + labels = np.repeat(np.arange(CLASS_N), len(digits)/CLASS_N) + return digits, labels -# split it onto train and test subsets -shuffle = np.random.permutation(N) -train_n = int(0.9*N) -digits_train, digits_test = np.split(digits[shuffle], [train_n]) -labels_train, labels_test = np.split(labels[shuffle], [train_n]) +def deskew(img): + m = cv2.moments(img) + if abs(m['mu02']) < 1e-2: + return img.copy() + skew = m['mu11']/m['mu02'] + M = np.float32([[1, skew, -0.5*SZ*skew], [0, 1, 0]]) + img = cv2.warpAffine(img, M, (SZ, SZ), flags=cv2.WARP_INVERSE_MAP | cv2.INTER_LINEAR) + return img -# train model -model = cv2.ANN_MLP() -layer_sizes = np.int32([SZ*SZ, 25, CLASS_N]) -model.create(layer_sizes) -params = dict( term_crit = (cv2.TERM_CRITERIA_COUNT, 100, 0.01), - train_method = cv2.ANN_MLP_TRAIN_PARAMS_BACKPROP, - bp_dw_scale = 0.001, - bp_moment_scale = 0.0 ) -print 'training...' -labels_train_unrolled = unroll_responses(labels_train, CLASS_N) -model.train(digits_train, labels_train_unrolled, None, params=params) -model.save('dig_nn.dat') -model.load('dig_nn.dat') +class StatModel(object): + def load(self, fn): + self.model.load(fn) + def save(self, fn): + self.model.save(fn) -def evaluate(model, samples, labels): - '''Evaluates classifier preformance on a given labeled samples set.''' - ret, resp = model.predict(samples) - resp = resp.argmax(-1) - error_mask = (resp == labels) - accuracy = error_mask.mean() - return accuracy, error_mask +class KNearest(StatModel): + def __init__(self, k = 3): + self.k = k + self.model = cv2.KNearest() -# evaluate model -train_accuracy, _ = evaluate(model, digits_train, labels_train) -print 'train accuracy: ', train_accuracy -test_accuracy, test_error_mask = evaluate(model, digits_test, labels_test) -print 'test accuracy: ', test_accuracy + def train(self, samples, responses): + self.model = cv2.KNearest() + self.model.train(samples, responses) -# visualize test results -vis = [] -for img, flag in zip(digits_test, test_error_mask): - img = np.uint8(img).reshape(SZ, SZ) - img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) - if not flag: - img[...,:2] = 0 - vis.append(img) -vis = mosaic(25, vis) -cv2.imshow('test', vis) -cv2.waitKey() + def predict(self, samples): + retval, results, neigh_resp, dists = self.model.find_nearest(samples, self.k) + return results.ravel() + +class SVM(StatModel): + def __init__(self, C = 1, gamma = 0.5): + self.params = dict( kernel_type = cv2.SVM_RBF, + svm_type = cv2.SVM_C_SVC, + C = C, + gamma = gamma ) + self.model = cv2.SVM() + + def train(self, samples, responses): + self.model = cv2.SVM() + self.model.train(samples, responses, params = self.params) + + def predict(self, samples): + return self.model.predict_all(samples).ravel() + + +def evaluate_model(model, digits, samples, labels): + resp = model.predict(samples) + err = (labels != resp).mean() + print 'error: %.2f %%' % (err*100) + + confusion = np.zeros((10, 10), np.int32) + for i, j in zip(labels, resp): + confusion[i, j] += 1 + print 'confusion matrix:' + print confusion + print + + vis = [] + for img, flag in zip(digits, resp == labels): + img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) + if not flag: + img[...,:2] = 0 + vis.append(img) + return mosaic(25, vis) + + +if __name__ == '__main__': + print __doc__ + + digits, labels = load_digits('digits.png') + + print 'preprocessing...' + # shuffle digits + rand = np.random.RandomState(12345) + shuffle = rand.permutation(len(digits)) + digits, labels = digits[shuffle], labels[shuffle] + + digits2 = map(deskew, digits) + samples = np.float32(digits2).reshape(-1, SZ*SZ) / 255.0 + + train_n = int(0.9*len(samples)) + cv2.imshow('test set', mosaic(25, digits[train_n:])) + digits_train, digits_test = np.split(digits2, [train_n]) + samples_train, samples_test = np.split(samples, [train_n]) + labels_train, labels_test = np.split(labels, [train_n]) + + + print 'training KNearest...' + model = KNearest(k=1) + model.train(samples_train, labels_train) + vis = evaluate_model(model, digits_test, samples_test, labels_test) + cv2.imshow('KNearest test', vis) + + print 'training SVM...' + model = SVM(C=4.66, gamma=0.08) + model.train(samples_train, labels_train) + vis = evaluate_model(model, digits_test, samples_test, labels_test) + cv2.imshow('SVM test', vis) + print 'saving SVM as "digits_svm.dat"...' + model.save('digits_svm.dat') + + cv2.waitKey(0) diff --git a/samples/python2/digits_adjust.py b/samples/python2/digits_adjust.py new file mode 100644 index 0000000000..3e1b6b76e6 --- /dev/null +++ b/samples/python2/digits_adjust.py @@ -0,0 +1,136 @@ +''' +Digit recognition adjustment. +Grid search is used to find the best parameters for SVN and KNearest classifiers. +SVM adjustment follows the guidelines given in +http://www.csie.ntu.edu.tw/~cjlin/papers/guide/guide.pdf + +Threading or cloud computing (with http://www.picloud.com/)) may be used +to speedup the computation. + +Usage: + digits_adjust.py [--model {svm|knearest}] [--cloud] [--env ] + + --model {svm|knearest} - select the classifier (SVM is the default) + --cloud - use PiCloud computing platform (for SVM only) + --env - cloud environment name + +''' +# TODO dataset preprocessing in cloud +# TODO cloud env setup tutorial + +import numpy as np +import cv2 +from multiprocessing.pool import ThreadPool + +from digits import * + +def cross_validate(model_class, params, samples, labels, kfold = 3, pool = None): + n = len(samples) + folds = np.array_split(np.arange(n), kfold) + def f(i): + model = model_class(**params) + test_idx = folds[i] + train_idx = list(folds) + train_idx.pop(i) + train_idx = np.hstack(train_idx) + train_samples, train_labels = samples[train_idx], labels[train_idx] + test_samples, test_labels = samples[test_idx], labels[test_idx] + model.train(train_samples, train_labels) + resp = model.predict(test_samples) + score = (resp != test_labels).mean() + print ".", + return score + if pool is None: + scores = map(f, xrange(kfold)) + else: + scores = pool.map(f, xrange(kfold)) + return np.mean(scores) + +def adjust_KNearest(samples, labels): + print 'adjusting KNearest ...' + best_err, best_k = np.inf, -1 + for k in xrange(1, 9): + err = cross_validate(KNearest, dict(k=k), samples, labels) + if err < best_err: + best_err, best_k = err, k + print 'k = %d, error: %.2f %%' % (k, err*100) + best_params = dict(k=best_k) + print 'best params:', best_params + return best_params + +def adjust_SVM(samples, labels, usecloud=False, cloud_env=''): + Cs = np.logspace(0, 5, 10, base=2) + gammas = np.logspace(-7, -2, 10, base=2) + scores = np.zeros((len(Cs), len(gammas))) + scores[:] = np.nan + + if usecloud: + try: + import cloud + except ImportError: + print 'cloud module is not installed' + usecloud = False + if usecloud: + print 'uploading dataset to cloud...' + np.savez('train.npz', samples=samples, labels=labels) + cloud.files.put('train.npz') + + print 'adjusting SVM (may take a long time) ...' + def f(job): + i, j = job + params = dict(C = Cs[i], gamma=gammas[j]) + score = cross_validate(SVM, params, samples, labels) + return i, j, score + def fcloud(job): + i, j = job + cloud.files.get('train.npz') + npz = np.load('train.npz') + params = dict(C = Cs[i], gamma=gammas[j]) + score = cross_validate(SVM, params, npz['samples'], npz['labels']) + return i, j, score + + if usecloud: + jids = cloud.map(fcloud, np.ndindex(*scores.shape), _env=cloud_env, _profile=True) + ires = cloud.iresult(jids) + else: + pool = ThreadPool(processes=cv2.getNumberOfCPUs()) + ires = pool.imap_unordered(f, np.ndindex(*scores.shape)) + + for count, (i, j, score) in enumerate(ires): + scores[i, j] = score + print '%d / %d (best error: %.2f %%, last: %.2f %%)' % (count+1, scores.size, np.nanmin(scores)*100, score*100) + print scores + + i, j = np.unravel_index(scores.argmin(), scores.shape) + best_params = dict(C = Cs[i], gamma=gammas[j]) + print 'best params:', best_params + print 'best error: %.2f %%' % (scores.min()*100) + return best_params + +if __name__ == '__main__': + import getopt + import sys + + print __doc__ + + args, _ = getopt.getopt(sys.argv[1:], '', ['model=', 'cloud', 'env=']) + args = dict(args) + args.setdefault('--model', 'svm') + args.setdefault('--env', '') + if args['--model'] not in ['svm', 'knearest']: + print 'unknown model "%s"' % args['--model'] + sys.exit(1) + + digits, labels = load_digits('digits.png') + shuffle = np.random.permutation(len(digits)) + digits, labels = digits[shuffle], labels[shuffle] + digits2 = map(deskew, digits) + samples = np.float32(digits2).reshape(-1, SZ*SZ) / 255.0 + + t = clock() + if args['--model'] == 'knearest': + adjust_KNearest(samples, labels) + else: + adjust_SVM(samples, labels, usecloud='--cloud' in args, cloud_env = args['--env']) + print 'work time: %f s' % (clock() - t) + \ No newline at end of file diff --git a/samples/python2/digits_video.py b/samples/python2/digits_video.py new file mode 100644 index 0000000000..0eddadf854 --- /dev/null +++ b/samples/python2/digits_video.py @@ -0,0 +1,74 @@ +import numpy as np +import cv2 +import digits +import os +import video +from common import mosaic + + + +def main(): + cap = video.create_capture() + + classifier_fn = 'digits_svm.dat' + if not os.path.exists(classifier_fn): + print '"%s" not found, run digits.py first' % classifier_fn + return + + model = digits.SVM() + model.load('digits_svm.dat') + + SZ = 20 + + while True: + ret, frame = cap.read() + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + bin = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY_INV, 31, 10) + bin = cv2.medianBlur(bin, 3) + contours, heirs = cv2.findContours( bin.copy(), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + rects = map(cv2.boundingRect, contours) + valid_flags = [ 16 <= h <= 64 and w <= 1.2*h for x, y, w, h in rects] + + for i, cnt in enumerate(contours): + if not valid_flags[i]: + continue + _, _, _, outer_i = heirs[0, i] + if outer_i >=0 and valid_flags[outer_i]: + continue + x, y, w, h = rects[i] + cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0)) + sub = bin[y:,x:][:h,:w] + #sub = ~cv2.equalizeHist(sub) + #_, sub_bin = cv2.threshold(sub, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU) + + s = 1.5*float(h)/SZ + m = cv2.moments(sub) + m00 = m['m00'] + if m00/255 < 0.1*w*h or m00/255 > 0.9*w*h: + continue + + c1 = np.float32([m['m10'], m['m01']]) / m00 + c0 = np.float32([SZ/2, SZ/2]) + t = c1 - s*c0 + A = np.zeros((2, 3), np.float32) + A[:,:2] = np.eye(2)*s + A[:,2] = t + sub1 = cv2.warpAffine(sub, A, (SZ, SZ), flags=cv2.WARP_INVERSE_MAP | cv2.INTER_LINEAR) + sub1 = digits.deskew(sub1) + if x+w+SZ < frame.shape[1] and y+SZ < frame.shape[0]: + frame[y:,x+w:][:SZ, :SZ] = sub1[...,np.newaxis] + + sample = np.float32(sub1).reshape(1,SZ*SZ) / 255.0 + digit = model.predict(sample)[0] + + cv2.putText(frame, '%d'%digit, (x, y), cv2.FONT_HERSHEY_PLAIN, 1.0, (200, 0, 0), thickness = 1) + + + cv2.imshow('frame', frame) + cv2.imshow('bin', bin) + if cv2.waitKey(1) == 27: + break + +if __name__ == '__main__': + main()