-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlda.py
96 lines (80 loc) · 3.38 KB
/
lda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Labeled Latent Dirichlet Allocation
# This code is available under the MIT License.
# (c)2010 Nakatani Shuyo / Cybozu Labs Inc.
# refer to Ramage+, Labeled LDA: A supervised topic model for credit attribution in multi-labeled corpora(EMNLP2009)
# SOURCE: https://github.com/shuyo/iir/blob/master/lda/llda.py
import numpy,os
class LDA:
def __init__(self, K, alpha, beta):
self.K = K
self.alpha = alpha
self.beta = beta
def term_to_id(self, term):
if term not in self.vocas_id:
voca_id = len(self.vocas)
self.vocas_id[term] = voca_id
self.vocas.append(term)
else:
voca_id = self.vocas_id[term]
return voca_id
def complement_label(self, label):
if not label: return numpy.ones(len(self.labelmap))
vec = numpy.zeros(len(self.labelmap))
vec[0] = 1.0
for x in label: vec[self.labelmap[x]] = 1.0
return vec
def set_corpus(self, labelset, corpus, labels):
labelset.insert(0, "common")
self.labelmap = dict(zip(labelset, range(len(labelset))))
self.K = len(self.labelmap)
self.vocas = []
self.vocas_id = dict()
self.labels = numpy.array([self.complement_label(label) for label in labels])
self.docs = [[self.term_to_id(term) for term in doc] for doc in corpus]
M = len(corpus)
V = len(self.vocas)
self.z_m_n = []
self.n_m_z = numpy.zeros((M, self.K), dtype=int)
self.n_z_t = numpy.zeros((self.K, V), dtype=int)
self.n_z = numpy.zeros(self.K, dtype=int)
for m, doc, label in zip(range(M), self.docs, self.labels):
N_m = len(doc)
z_n = [numpy.random.multinomial(1, label / label.sum()).argmax() for x in range(N_m)]
self.z_m_n.append(z_n)
for t, z in zip(doc, z_n):
self.n_m_z[m, z] += 1
self.n_z_t[z, t] += 1
self.n_z[z] += 1
def inference(self):
V = len(self.vocas)
for m, doc, label in zip(range(len(self.docs)), self.docs, self.labels):
for n in range(len(doc)):
t = doc[n]
z = self.z_m_n[m][n]
self.n_m_z[m, z] -= 1
self.n_z_t[z, t] -= 1
self.n_z[z] -= 1
denom_a = self.n_m_z[m].sum() + self.K * self.alpha
denom_b = self.n_z_t.sum(axis=1) + V * self.beta
p_z = label * (self.n_z_t[:, t] + self.beta) / denom_b * (self.n_m_z[m] + self.alpha) / denom_a
new_z = numpy.random.multinomial(1, p_z / p_z.sum()).argmax()
self.z_m_n[m][n] = new_z
self.n_m_z[m, new_z] += 1
self.n_z_t[new_z, t] += 1
self.n_z[new_z] += 1
def phi(self):
V = len(self.vocas)
return (self.n_z_t + self.beta) / (self.n_z[:, numpy.newaxis] + V * self.beta)
def theta(self):
n_alpha = self.n_m_z + self.labels * self.alpha
return n_alpha / n_alpha.sum(axis=1)[:, numpy.newaxis]
def perplexity(self, docs=None):
if docs == None: docs = self.docs
phi = self.phi()
thetas = self.theta()
log_per = N = 0
for doc, theta in zip(docs, thetas):
for w in doc:
log_per -= numpy.log(numpy.inner(phi[:,w], theta))
N += len(doc)
return numpy.exp(log_per / N)