374 lines
13 KiB
Python
374 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# This module is a port of the Textblob Averaged Perceptron Tagger
|
|
# Author: Matthew Honnibal <honnibal+gh@gmail.com>,
|
|
# Long Duong <longdt219@gmail.com> (NLTK port)
|
|
# URL: <https://github.com/sloria/textblob-aptagger>
|
|
# <http://nltk.org/>
|
|
# Copyright 2013 Matthew Honnibal
|
|
# NLTK modifications Copyright 2015 The NLTK Project
|
|
#
|
|
# This module is provided under the terms of the MIT License.
|
|
|
|
import random
|
|
from collections import defaultdict
|
|
import pickle
|
|
import logging
|
|
|
|
from nltk.tag.api import TaggerI
|
|
from nltk.data import find, load
|
|
|
|
from nltk import jsontags
|
|
|
|
try:
|
|
import numpy as np
|
|
except ImportError:
|
|
pass
|
|
|
|
PICKLE = "averaged_perceptron_tagger.pickle"
|
|
|
|
@jsontags.register_tag
|
|
class AveragedPerceptron:
|
|
|
|
"""An averaged perceptron, as implemented by Matthew Honnibal.
|
|
|
|
See more implementation details here:
|
|
https://explosion.ai/blog/part-of-speech-pos-tagger-in-python
|
|
"""
|
|
|
|
json_tag = "nltk.tag.perceptron.AveragedPerceptron"
|
|
|
|
def __init__(self, weights=None):
|
|
# Each feature gets its own weight vector, so weights is a dict-of-dicts
|
|
self.weights = weights if weights else {}
|
|
self.classes = set()
|
|
# The accumulated values, for the averaging. These will be keyed by
|
|
# feature/clas tuples
|
|
self._totals = defaultdict(int)
|
|
# The last time the feature was changed, for the averaging. Also
|
|
# keyed by feature/clas tuples
|
|
# (tstamps is short for timestamps)
|
|
self._tstamps = defaultdict(int)
|
|
# Number of instances seen
|
|
self.i = 0
|
|
|
|
def _softmax(self, scores):
|
|
s = np.fromiter(scores.values(), dtype=float)
|
|
exps = np.exp(s)
|
|
return exps / np.sum(exps)
|
|
|
|
def predict(self, features, return_conf=False):
|
|
"""Dot-product the features and current weights and return the best label."""
|
|
scores = defaultdict(float)
|
|
for feat, value in features.items():
|
|
if feat not in self.weights or value == 0:
|
|
continue
|
|
weights = self.weights[feat]
|
|
for label, weight in weights.items():
|
|
scores[label] += value * weight
|
|
|
|
# Do a secondary alphabetic sort, for stability
|
|
best_label = max(self.classes, key=lambda label: (scores[label], label))
|
|
# compute the confidence
|
|
conf = max(self._softmax(scores)) if return_conf == True else None
|
|
|
|
return best_label, conf
|
|
|
|
def update(self, truth, guess, features):
|
|
"""Update the feature weights."""
|
|
|
|
def upd_feat(c, f, w, v):
|
|
param = (f, c)
|
|
self._totals[param] += (self.i - self._tstamps[param]) * w
|
|
self._tstamps[param] = self.i
|
|
self.weights[f][c] = w + v
|
|
|
|
self.i += 1
|
|
if truth == guess:
|
|
return None
|
|
for f in features:
|
|
weights = self.weights.setdefault(f, {})
|
|
upd_feat(truth, f, weights.get(truth, 0.0), 1.0)
|
|
upd_feat(guess, f, weights.get(guess, 0.0), -1.0)
|
|
|
|
def average_weights(self):
|
|
"""Average weights from all iterations."""
|
|
for feat, weights in self.weights.items():
|
|
new_feat_weights = {}
|
|
for clas, weight in weights.items():
|
|
param = (feat, clas)
|
|
total = self._totals[param]
|
|
total += (self.i - self._tstamps[param]) * weight
|
|
averaged = round(total / self.i, 3)
|
|
if averaged:
|
|
new_feat_weights[clas] = averaged
|
|
self.weights[feat] = new_feat_weights
|
|
|
|
def save(self, path):
|
|
"""Save the pickled model weights."""
|
|
with open(path, "wb") as fout:
|
|
return pickle.dump(dict(self.weights), fout)
|
|
|
|
def load(self, path):
|
|
"""Load the pickled model weights."""
|
|
self.weights = load(path)
|
|
|
|
def encode_json_obj(self):
|
|
return self.weights
|
|
|
|
@classmethod
|
|
def decode_json_obj(cls, obj):
|
|
return cls(obj)
|
|
|
|
|
|
@jsontags.register_tag
|
|
class PerceptronTagger(TaggerI):
|
|
|
|
"""
|
|
Greedy Averaged Perceptron tagger, as implemented by Matthew Honnibal.
|
|
See more implementation details here:
|
|
https://explosion.ai/blog/part-of-speech-pos-tagger-in-python
|
|
|
|
>>> from nltk.tag.perceptron import PerceptronTagger
|
|
|
|
Train the model
|
|
|
|
>>> tagger = PerceptronTagger(load=False)
|
|
|
|
>>> tagger.train([[('today','NN'),('is','VBZ'),('good','JJ'),('day','NN')],
|
|
... [('yes','NNS'),('it','PRP'),('beautiful','JJ')]])
|
|
|
|
>>> tagger.tag(['today','is','a','beautiful','day'])
|
|
[('today', 'NN'), ('is', 'PRP'), ('a', 'PRP'), ('beautiful', 'JJ'), ('day', 'NN')]
|
|
|
|
Use the pretrain model (the default constructor)
|
|
|
|
>>> pretrain = PerceptronTagger()
|
|
|
|
>>> pretrain.tag('The quick brown fox jumps over the lazy dog'.split())
|
|
[('The', 'DT'), ('quick', 'JJ'), ('brown', 'NN'), ('fox', 'NN'), ('jumps', 'VBZ'), ('over', 'IN'), ('the', 'DT'), ('lazy', 'JJ'), ('dog', 'NN')]
|
|
|
|
>>> pretrain.tag("The red cat".split())
|
|
[('The', 'DT'), ('red', 'JJ'), ('cat', 'NN')]
|
|
"""
|
|
|
|
json_tag = "nltk.tag.sequential.PerceptronTagger"
|
|
|
|
START = ["-START-", "-START2-"]
|
|
END = ["-END-", "-END2-"]
|
|
|
|
def __init__(self, load=True):
|
|
"""
|
|
:param load: Load the pickled model upon instantiation.
|
|
"""
|
|
self.model = AveragedPerceptron()
|
|
self.tagdict = {}
|
|
self.classes = set()
|
|
if load:
|
|
AP_MODEL_LOC = "file:" + str(
|
|
find("taggers/averaged_perceptron_tagger/" + PICKLE)
|
|
)
|
|
self.load(AP_MODEL_LOC)
|
|
|
|
def tag(self, tokens, return_conf=False, use_tagdict=True):
|
|
"""
|
|
Tag tokenized sentences.
|
|
:params tokens: list of word
|
|
:type tokens: list(str)
|
|
"""
|
|
prev, prev2 = self.START
|
|
output = []
|
|
|
|
context = self.START + [self.normalize(w) for w in tokens] + self.END
|
|
for i, word in enumerate(tokens):
|
|
tag, conf = (
|
|
(self.tagdict.get(word), 1.0) if use_tagdict == True else (None, None)
|
|
)
|
|
if not tag:
|
|
features = self._get_features(i, word, context, prev, prev2)
|
|
tag, conf = self.model.predict(features, return_conf)
|
|
output.append((word, tag, conf) if return_conf == True else (word, tag))
|
|
|
|
prev2 = prev
|
|
prev = tag
|
|
|
|
return output
|
|
|
|
def train(self, sentences, save_loc=None, nr_iter=5):
|
|
"""Train a model from sentences, and save it at ``save_loc``. ``nr_iter``
|
|
controls the number of Perceptron training iterations.
|
|
|
|
:param sentences: A list or iterator of sentences, where each sentence
|
|
is a list of (words, tags) tuples.
|
|
:param save_loc: If not ``None``, saves a pickled model in this location.
|
|
:param nr_iter: Number of training iterations.
|
|
"""
|
|
# We'd like to allow ``sentences`` to be either a list or an iterator,
|
|
# the latter being especially important for a large training dataset.
|
|
# Because ``self._make_tagdict(sentences)`` runs regardless, we make
|
|
# it populate ``self._sentences`` (a list) with all the sentences.
|
|
# This saves the overheard of just iterating through ``sentences`` to
|
|
# get the list by ``sentences = list(sentences)``.
|
|
|
|
self._sentences = list() # to be populated by self._make_tagdict...
|
|
self._make_tagdict(sentences)
|
|
self.model.classes = self.classes
|
|
for iter_ in range(nr_iter):
|
|
c = 0
|
|
n = 0
|
|
for sentence in self._sentences:
|
|
words, tags = zip(*sentence)
|
|
|
|
prev, prev2 = self.START
|
|
context = self.START + [self.normalize(w) for w in words] + self.END
|
|
for i, word in enumerate(words):
|
|
guess = self.tagdict.get(word)
|
|
if not guess:
|
|
feats = self._get_features(i, word, context, prev, prev2)
|
|
guess, _ = self.model.predict(feats)
|
|
self.model.update(tags[i], guess, feats)
|
|
prev2 = prev
|
|
prev = guess
|
|
c += guess == tags[i]
|
|
n += 1
|
|
random.shuffle(self._sentences)
|
|
logging.info("Iter {0}: {1}/{2}={3}".format(iter_, c, n, _pc(c, n)))
|
|
|
|
# We don't need the training sentences anymore, and we don't want to
|
|
# waste space on them when we pickle the trained tagger.
|
|
self._sentences = None
|
|
|
|
self.model.average_weights()
|
|
# Pickle as a binary file
|
|
if save_loc is not None:
|
|
with open(save_loc, "wb") as fout:
|
|
# changed protocol from -1 to 2 to make pickling Python 2 compatible
|
|
pickle.dump((self.model.weights, self.tagdict, self.classes), fout, 2)
|
|
|
|
def load(self, loc):
|
|
"""
|
|
:param loc: Load a pickled model at location.
|
|
:type loc: str
|
|
"""
|
|
|
|
self.model.weights, self.tagdict, self.classes = load(loc)
|
|
self.model.classes = self.classes
|
|
|
|
def encode_json_obj(self):
|
|
return self.model.weights, self.tagdict, list(self.classes)
|
|
|
|
@classmethod
|
|
def decode_json_obj(cls, obj):
|
|
tagger = cls(load=False)
|
|
tagger.model.weights, tagger.tagdict, tagger.classes = obj
|
|
tagger.classes = set(tagger.classes)
|
|
tagger.model.classes = tagger.classes
|
|
return tagger
|
|
|
|
def normalize(self, word):
|
|
"""
|
|
Normalization used in pre-processing.
|
|
- All words are lower cased
|
|
- Groups of digits of length 4 are represented as !YEAR;
|
|
- Other digits are represented as !DIGITS
|
|
|
|
:rtype: str
|
|
"""
|
|
if "-" in word and word[0] != "-":
|
|
return "!HYPHEN"
|
|
elif word.isdigit() and len(word) == 4:
|
|
return "!YEAR"
|
|
elif word[0].isdigit():
|
|
return "!DIGITS"
|
|
else:
|
|
return word.lower()
|
|
|
|
def _get_features(self, i, word, context, prev, prev2):
|
|
"""Map tokens into a feature representation, implemented as a
|
|
{hashable: int} dict. If the features change, a new model must be
|
|
trained.
|
|
"""
|
|
|
|
def add(name, *args):
|
|
features[" ".join((name,) + tuple(args))] += 1
|
|
|
|
i += len(self.START)
|
|
features = defaultdict(int)
|
|
# It's useful to have a constant feature, which acts sort of like a prior
|
|
add("bias")
|
|
add("i suffix", word[-3:])
|
|
add("i pref1", word[0])
|
|
add("i-1 tag", prev)
|
|
add("i-2 tag", prev2)
|
|
add("i tag+i-2 tag", prev, prev2)
|
|
add("i word", context[i])
|
|
add("i-1 tag+i word", prev, context[i])
|
|
add("i-1 word", context[i - 1])
|
|
add("i-1 suffix", context[i - 1][-3:])
|
|
add("i-2 word", context[i - 2])
|
|
add("i+1 word", context[i + 1])
|
|
add("i+1 suffix", context[i + 1][-3:])
|
|
add("i+2 word", context[i + 2])
|
|
return features
|
|
|
|
def _make_tagdict(self, sentences):
|
|
"""
|
|
Make a tag dictionary for single-tag words.
|
|
:param sentences: A list of list of (word, tag) tuples.
|
|
"""
|
|
counts = defaultdict(lambda: defaultdict(int))
|
|
for sentence in sentences:
|
|
self._sentences.append(sentence)
|
|
for word, tag in sentence:
|
|
counts[word][tag] += 1
|
|
self.classes.add(tag)
|
|
freq_thresh = 20
|
|
ambiguity_thresh = 0.97
|
|
for word, tag_freqs in counts.items():
|
|
tag, mode = max(tag_freqs.items(), key=lambda item: item[1])
|
|
n = sum(tag_freqs.values())
|
|
# Don't add rare words to the tag dictionary
|
|
# Only add quite unambiguous words
|
|
if n >= freq_thresh and (mode / n) >= ambiguity_thresh:
|
|
self.tagdict[word] = tag
|
|
|
|
|
|
def _pc(n, d):
|
|
return (n / d) * 100
|
|
|
|
|
|
def _load_data_conll_format(filename):
|
|
print("Read from file: ", filename)
|
|
with open(filename, "rb") as fin:
|
|
sentences = []
|
|
sentence = []
|
|
for line in fin.readlines():
|
|
line = line.strip()
|
|
# print line
|
|
if len(line) == 0:
|
|
sentences.append(sentence)
|
|
sentence = []
|
|
continue
|
|
tokens = line.split("\t")
|
|
word = tokens[1]
|
|
tag = tokens[4]
|
|
sentence.append((word, tag))
|
|
return sentences
|
|
|
|
|
|
def _get_pretrain_model():
|
|
# Train and test on English part of ConLL data (WSJ part of Penn Treebank)
|
|
# Train: section 2-11
|
|
# Test : section 23
|
|
tagger = PerceptronTagger()
|
|
training = _load_data_conll_format("english_ptb_train.conll")
|
|
testing = _load_data_conll_format("english_ptb_test.conll")
|
|
print("Size of training and testing (sentence)", len(training), len(testing))
|
|
# Train and save the model
|
|
tagger.train(training, PICKLE)
|
|
print("Accuracy : ", tagger.evaluate(testing))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# _get_pretrain_model()
|
|
pass
|