Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] parallel perceptron training #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions seqlearn/_utils/shards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from __future__ import absolute_import
import numpy as np

class SequenceShards(object):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have to read this through, but can't this "sharding" be reduced to SequenceKFold somehow? It looks awfully similar.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "SequenceKFold" can (and should) be implemented using "sharding", but not the other way around. It is a copy-pasted version of SequenceKFold indeed, with some unnecessary stuff removed.

"""Sequence-aware (repeated) splitter.

Uses a greedy heuristic to partition input sequences into sets with roughly
equal numbers of samples, while keeping the sequences intact.

Parameters
----------
lengths : array-like of integers, shape (n_samples,)
Lengths of sequences, in the order in which they appear in the dataset.

n_folds : int, optional
Number of folds.

Returns
-------
A generator yielding (indices, length_indices) tuples.
"""

def __init__(self, lengths, n_folds):
self.lengths = lengths
self.n_folds = n_folds

def __iter__(self):
lengths = np.asarray(self.lengths, dtype=np.intp)
starts = np.cumsum(lengths) - lengths
n_samples = np.sum(lengths)

seq_ind = np.arange(len(lengths))

folds = [[] for _ in range(self.n_folds)]
samples_per_fold = np.zeros(self.n_folds, dtype=int)

# Greedy strategy: always append to the currently smallest fold
for i in seq_ind:
seq = (i, starts[i], starts[i] + lengths[i])
fold_idx = np.argmin(samples_per_fold)
folds[fold_idx].append(seq)
samples_per_fold[fold_idx] += lengths[i]

for f in folds:
mask = np.zeros(n_samples, dtype=bool)
lengths_mask = np.zeros(len(lengths), dtype=bool)
for i, start, end in f:
mask[start:end] = True
lengths_mask[i] = True
indices = np.where(mask)[0]
length_indices = np.where(lengths_mask)[0]
yield indices, length_indices

def __len__(self):
return self.n_folds
261 changes: 246 additions & 15 deletions seqlearn/perceptron.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,45 @@
# Copyright 2013 Lars Buitinck

from __future__ import division, print_function
from __future__ import division, print_function, absolute_import

import sys

from collections import namedtuple
import numpy as np

from .base import BaseSequenceClassifier
from ._utils import (atleast2d_or_csr, check_random_state, count_trans,
safe_sparse_dot)
from ._utils.shards import SequenceShards

_EpochState = namedtuple('EpochState', [
'w', 'w_trans', 'w_init', 'w_final',
'w_avg', 'w_trans_avg', 'w_init_avg', 'w_final_avg'])

class EpochState(_EpochState):
def copy(self):
return self.__class__(*[np.copy(w) for w in self])

@classmethod
def initial(cls, n_classes, n_features):
w = np.zeros((n_classes, n_features), order='F')
w_trans = np.zeros((n_classes, n_classes))
w_init = np.zeros(n_classes)
w_final = np.zeros(n_classes)

w_avg = np.zeros_like(w)
w_trans_avg = np.zeros_like(w_trans)
w_init_avg = np.zeros_like(w_init)
w_final_avg = np.zeros_like(w_final)

return cls(w, w_trans, w_init, w_final,
w_avg, w_trans_avg, w_init_avg, w_final_avg)

def averages(self, k):
coef_ = self.w_avg / k
coef_init_ = self.w_init_avg / k
coef_trans_ = self.w_trans_avg / k
coef_final_ = self.w_final_avg / k
return coef_, coef_init_, coef_trans_, coef_final_


class StructuredPerceptron(BaseSequenceClassifier):
Expand Down Expand Up @@ -85,28 +116,19 @@ def fit(self, X, y, lengths):
end = np.cumsum(lengths)
start = end - lengths

w = np.zeros((n_classes, n_features), order='F')
w_trans = np.zeros((n_classes, n_classes))
w_init = np.zeros(n_classes)
w_final = np.zeros(n_classes)

w_avg = np.zeros_like(w)
w_trans_avg = np.zeros_like(w_trans)
w_init_avg = np.zeros_like(w_init)
w_final_avg = np.zeros_like(w_final)
state = EpochState.initial(n_classes, n_features)
(w, w_trans, w_init, w_final,
w_avg, w_trans_avg, w_init_avg, w_final_avg) = state

lr = self.learning_rate

sequence_ids = np.arange(lengths.shape[0])
rng = check_random_state(self.random_state)

for it in xrange(1, self.max_iter + 1):
if self.verbose:
print("Iteration ", it, end="... ")
sys.stdout.flush()

rng.shuffle(sequence_ids)

sequence_ids = rng.permutation(lengths.shape[0])
sum_loss = 0

for i in sequence_ids:
Expand Down Expand Up @@ -155,3 +177,212 @@ def fit(self, X, y, lengths):
self.classes_ = classes

return self


class OneEpochPerceptron(BaseSequenceClassifier):
def __init__(self, decode='viterbi', learning_rate=.1, random_state=None):
self.decode = decode
self.learning_rate = learning_rate
self.random_state = random_state

def fit(self, X, y, lengths, class_range):
"""Remember a set of training sequences.

Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Feature matrix of individual samples.

y : array-like, shape (n_samples,)
Target labels.

lengths : array-like of integers, shape (n_sequences,)
Lengths of the individual sequences in X, y. The sum of these
should be n_samples.

class_range : array-like of integers

Returns
-------
self : OneEpochPerceptron
"""
self.X = atleast2d_or_csr(X)
self.y = y
self.lengths = np.asarray(lengths)

self.class_range = class_range
self.Y_true = y.reshape(-1, 1) == class_range

self.rng = check_random_state(self.random_state)
self.end = np.cumsum(lengths)
self.start = self.end - lengths

def transform(self, state):
"""Compute updated weights: do a single iteration over
data and return (new_state, sum_loss) tuple.

Parameters
----------
state : EpochState
Weighted mixture of parameters from all shards from
previous epoch.

Returns
-------
(new_state, sum_loss) tuple.

"""
decode = self._get_decoder()

lr = self.learning_rate
Y_true = self.Y_true
n_classes = Y_true.shape[1]

rng = check_random_state(self.random_state)
sequence_ids = rng.permutation(self.lengths.shape[0])

sum_loss = 0
s = state.copy()

for i in sequence_ids:
X_i = self.X[self.start[i]:self.end[i]]
Score = safe_sparse_dot(X_i, s.w.T)
y_pred = decode(Score, s.w_trans, s.w_init, s.w_final)
y_t_i = self.y[self.start[i]:self.end[i]]
loss = (y_pred != y_t_i).sum()

if loss:
sum_loss += loss

Y_t_i = Y_true[self.start[i]:self.end[i]]
Y_pred = y_pred.reshape(-1, 1) == self.class_range
Y_pred = Y_pred.astype(np.float64)

Y_diff = Y_pred - Y_t_i
Y_diff *= lr
s.w[:] -= safe_sparse_dot(Y_diff.T, X_i)

t_trans = count_trans(y_t_i, n_classes)
p_trans = count_trans(y_pred, n_classes)
s.w_trans[:] -= lr * (p_trans - t_trans)
s.w_init[:] -= lr * (Y_pred[0] - Y_true[self.start[i]])
s.w_final[:] -= lr * (Y_pred[-1] - Y_true[self.end[i] - 1])

s.w_avg[:] += s.w
s.w_trans_avg[:] += s.w_trans
s.w_init_avg[:] += s.w_init
s.w_final_avg[:] += s.w_final

return s, sum_loss

@classmethod
def split_task(cls, X, y, lengths, n_jobs, **init_kwargs):
"""
Return a list of OneEpochPerceptron instances.
"""
X = atleast2d_or_csr(X)
lengths = np.asarray(lengths)

classes = np.unique(y)
class_range = np.arange(len(classes))

perceptrons = []
for shard, length_shard in SequenceShards(lengths, n_jobs):
perc = OneEpochPerceptron(**init_kwargs)
perc.fit(X[shard], y[shard], lengths[length_shard], class_range)
perceptrons.append(perc)

return perceptrons


def mixed_epoch_states(transform_results):
k = len(transform_results)
states = [state for state, sum_loss in transform_results]
state = EpochState(*[sum(coeffs) / k for coeffs in zip(*states)])
sum_loss = sum(loss for state, loss in transform_results)
return state, sum_loss


class ParallelStructuredPerceptron(BaseSequenceClassifier):
"""Structured perceptron for sequence classification.

Similar to StructuredPerceptron, but uses iterative parameter mixing
to enable parallel learning.

XXX: "parallel" doesn't work yet.

References
----------
Ryan Mcdonald, Keith Hall, and Gideon Mann (2010)
Distributed training strategies for the structured perceptron. NAACL'10.
"""
def __init__(self, decode="viterbi", learning_rate=.1, max_iter=10,
random_state=None, verbose=0, n_jobs=1):
self.decode = decode
self.learning_rate = learning_rate
self.max_iter = max_iter
self.random_state = random_state
self.verbose = verbose
self.n_jobs = n_jobs

def fit(self, X, y, lengths):
"""Fit to a set of sequences.

Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Feature matrix of individual samples.

y : array-like, shape (n_samples,)
Target labels.

lengths : array-like of integers, shape (n_sequences,)
Lengths of the individual sequences in X, y. The sum of these
should be n_samples.

Returns
-------
self : ParallelStructuredPerceptron
"""

classes, y = np.unique(y, return_inverse=True)
class_range = np.arange(len(classes))
Y_true = y.reshape(-1, 1) == class_range
n_classes = Y_true.shape[1]

X = atleast2d_or_csr(X)
n_samples, n_features = X.shape

rng = check_random_state(self.random_state)

perceptrons = OneEpochPerceptron.split_task(X, y, lengths, self.n_jobs,
decode=self.decode,
learning_rate=self.learning_rate,
random_state=rng)

state = EpochState.initial(n_classes, n_features)
for it in range(1, self.max_iter + 1):
if self.verbose:
print("Iteration ", it, end="... ")
sys.stdout.flush()

# XXX: how to make this parallel without copying X, y, etc.
# on each iteration?
results = [p.transform(state) for p in perceptrons]

#with futures.ThreadPoolExecutor(self.n_jobs) as executor:
# jobs = [executor.submit(p.transform, state) for p in perceptrons]
# results = [f.result() for f in futures.as_completed(jobs)]

state, sum_loss = mixed_epoch_states(results)

if self.verbose:
# XXX the loss reported is that for w, but the one for
# w_avg is what matters for early stopping.
print("Loss = {0:.4f}".format(sum_loss / n_samples))

coefs = state.averages(it*len(lengths))
self.coef_, self.coef_init_, self.coef_trans_, self.coef_final_ = coefs
self.classes_ = classes

return self
Loading