Module wuggy.utilities.bigramchain
Expand source code
import random
from collections import defaultdict, namedtuple
Link = namedtuple('Link', ['position', 'value'])
class BigramChain(defaultdict):
"""
A dictionary storing the next possible value, given a list of input sequences.
"""
def __init__(self, language_plugin, data=None, encoding='utf-8', size=100, cutoff=1, token=False):
defaultdict.__init__(self, dict)
self.language_plugin = language_plugin
try:
self.hidden_sequence = self.language_plugin.hidden_sequence
except AttributeError:
self.hidden_sequence = False
if data != None:
self.load(data, size=size, cutoff=cutoff, token=token)
self.startkeys = []
self.limit_frequencies = {}
def load(self, datafile, size=100, cutoff=1, token=False):
lines = datafile.readlines()
for i, line in enumerate(lines):
fields = line.strip('\n\t').split(self.language_plugin.separator)
reference, input_sequence, frequency = fields
frequency = float(frequency) if token == True else 1
frequency = 1
sequence = (self.language_plugin.transform(
input_sequence, frequency))
n = len(sequence.representation)
if frequency >= cutoff and random.randint(1, 100) <= size:
for j in range(n):
key = Link(j, sequence.representation[j])
if j+1 < n:
next_key = Link(j+1, sequence.representation[j+1])
self[key][next_key] = self[key].get(
next_key, 0)+sequence.frequency
else:
pass
datafile.close()
self.set_startkeys()
def set_startkeys(self, reference_sequence=None, fields=None):
if fields == None:
fields = self.language_plugin.default_fields
if reference_sequence == None:
self.startkeys = dict([(key, 0)
for key in self.keys() if key.position == 0])
else:
self.startkeys = {}
for key in self.keys():
if key.position == 0:
self.startkeys[key] = 0
def get_frequencies(self, reference_sequence):
frequencies = {}
for position in range(len(reference_sequence)-1):
key = Link(position, reference_sequence[position])
nextkey = Link(position+1, reference_sequence[position+1])
try:
frequency = self[key][nextkey]
except KeyError:
frequency = 0
frequencies[position] = frequency
return frequencies
def build_limit_frequencies(self, fields):
limits = defaultdict(dict)
for key, nextkeys in self.items():
position, value = key
subkey_a = (position, tuple(
[value.__getattribute__(field) for field in fields]))
for nextkey, frequency in nextkeys.items():
position, value = nextkey
subkey_b = (position, tuple(
[value.__getattribute__(field) for field in fields]))
subkey = (subkey_a, subkey_b)
minfrequency = limits[subkey].get('min', frequency)
limits[subkey]['min'] = min(minfrequency, frequency)
maxfrequency = limits[subkey].get('max', frequency)
limits[subkey]['max'] = max(maxfrequency, frequency)
self.limit_frequencies[tuple(fields)] = limits
def frequency_filter(self, reference_sequence, lower, upper, kind='dev'):
result = BigramChain(self.language_plugin)
frequencies = self.get_frequencies(reference_sequence)
for key, nextkeys in self.items():
try:
if kind == 'dev':
minfreq = frequencies[key.position]-lower
maxfreq = frequencies[key.position]+upper
elif kind == 'limit':
minfreq = lower
maxfreq = upper
except:
pass
else:
for nextkey, frequency in nextkeys.items():
if minfreq <= frequency <= maxfreq:
result[key][nextkey] = frequency
result = result.clean(len(reference_sequence)-1)
result.set_startkeys()
return result
def segmentset_filter(self, reference_sequence, segmentset):
segmentset = segmentset.union(set(('^', '$')))
result = BigramChain(self.language_plugin)
for key, nextkeys in self.items():
if key.value.letters in segmentset:
for nextkey, frequency in nextkeys.items():
if nextkey.value.letters in segmentset:
result[key][nextkey] = frequency
result = result.clean(len(reference_sequence)-1)
result.set_startkeys()
return result
def attribute_filter(self, reference_sequence, attribute):
result = BigramChain(self.language_plugin)
if type(reference_sequence[0]) == self.language_plugin.Segment:
for key, nextkeys in self.items():
try:
if key.value.__getattribute__(attribute) == reference_sequence[
key.position].__getattribute__(attribute):
result[key] = self[key]
except IndexError:
pass
else:
for key, nextkeys in self.items():
try:
if key.value.__getattribute__(attribute) == reference_sequence[key.position]:
result[key] = self[key]
except IndexError:
pass
return result
def clean(self, maxpos):
"""
Remove chains that can not be completed.
"""
result = BigramChain(self.language_plugin)
for key, nextkeys in self.items():
for nextkey, frequency in nextkeys.items():
if nextkey in self or nextkey.position == maxpos:
result[key][nextkey] = frequency
if len(self) == len(result):
return result
else:
return result.clean(maxpos)
def generate(self, startkeys=None):
if startkeys is None:
startkeys = self.startkeys
startkeys = list(startkeys.items())
random.shuffle(startkeys)
startkeys = dict(startkeys)
if len(self) > 0:
for key in startkeys:
if key not in self:
yield (key.value,)
else:
next_keys = self[key]
for result in self.generate(next_keys):
yield (key.value,)+result
else:
raise Exception('LinkError')
def display(self):
for key, nextkeys in sorted(self.items(), key=lambda x: x):
print('***', key.position, key.value)
for nextkey, frequency in nextkeys.items():
print(nextkey.value, frequency)
Classes
class BigramChain (language_plugin, data=None, encoding='utf-8', size=100, cutoff=1, token=False)
-
A dictionary storing the next possible value, given a list of input sequences.
Expand source code
class BigramChain(defaultdict): """ A dictionary storing the next possible value, given a list of input sequences. """ def __init__(self, language_plugin, data=None, encoding='utf-8', size=100, cutoff=1, token=False): defaultdict.__init__(self, dict) self.language_plugin = language_plugin try: self.hidden_sequence = self.language_plugin.hidden_sequence except AttributeError: self.hidden_sequence = False if data != None: self.load(data, size=size, cutoff=cutoff, token=token) self.startkeys = [] self.limit_frequencies = {} def load(self, datafile, size=100, cutoff=1, token=False): lines = datafile.readlines() for i, line in enumerate(lines): fields = line.strip('\n\t').split(self.language_plugin.separator) reference, input_sequence, frequency = fields frequency = float(frequency) if token == True else 1 frequency = 1 sequence = (self.language_plugin.transform( input_sequence, frequency)) n = len(sequence.representation) if frequency >= cutoff and random.randint(1, 100) <= size: for j in range(n): key = Link(j, sequence.representation[j]) if j+1 < n: next_key = Link(j+1, sequence.representation[j+1]) self[key][next_key] = self[key].get( next_key, 0)+sequence.frequency else: pass datafile.close() self.set_startkeys() def set_startkeys(self, reference_sequence=None, fields=None): if fields == None: fields = self.language_plugin.default_fields if reference_sequence == None: self.startkeys = dict([(key, 0) for key in self.keys() if key.position == 0]) else: self.startkeys = {} for key in self.keys(): if key.position == 0: self.startkeys[key] = 0 def get_frequencies(self, reference_sequence): frequencies = {} for position in range(len(reference_sequence)-1): key = Link(position, reference_sequence[position]) nextkey = Link(position+1, reference_sequence[position+1]) try: frequency = self[key][nextkey] except KeyError: frequency = 0 frequencies[position] = frequency return frequencies def build_limit_frequencies(self, fields): limits = defaultdict(dict) for key, nextkeys in self.items(): position, value = key subkey_a = (position, tuple( [value.__getattribute__(field) for field in fields])) for nextkey, frequency in nextkeys.items(): position, value = nextkey subkey_b = (position, tuple( [value.__getattribute__(field) for field in fields])) subkey = (subkey_a, subkey_b) minfrequency = limits[subkey].get('min', frequency) limits[subkey]['min'] = min(minfrequency, frequency) maxfrequency = limits[subkey].get('max', frequency) limits[subkey]['max'] = max(maxfrequency, frequency) self.limit_frequencies[tuple(fields)] = limits def frequency_filter(self, reference_sequence, lower, upper, kind='dev'): result = BigramChain(self.language_plugin) frequencies = self.get_frequencies(reference_sequence) for key, nextkeys in self.items(): try: if kind == 'dev': minfreq = frequencies[key.position]-lower maxfreq = frequencies[key.position]+upper elif kind == 'limit': minfreq = lower maxfreq = upper except: pass else: for nextkey, frequency in nextkeys.items(): if minfreq <= frequency <= maxfreq: result[key][nextkey] = frequency result = result.clean(len(reference_sequence)-1) result.set_startkeys() return result def segmentset_filter(self, reference_sequence, segmentset): segmentset = segmentset.union(set(('^', '$'))) result = BigramChain(self.language_plugin) for key, nextkeys in self.items(): if key.value.letters in segmentset: for nextkey, frequency in nextkeys.items(): if nextkey.value.letters in segmentset: result[key][nextkey] = frequency result = result.clean(len(reference_sequence)-1) result.set_startkeys() return result def attribute_filter(self, reference_sequence, attribute): result = BigramChain(self.language_plugin) if type(reference_sequence[0]) == self.language_plugin.Segment: for key, nextkeys in self.items(): try: if key.value.__getattribute__(attribute) == reference_sequence[ key.position].__getattribute__(attribute): result[key] = self[key] except IndexError: pass else: for key, nextkeys in self.items(): try: if key.value.__getattribute__(attribute) == reference_sequence[key.position]: result[key] = self[key] except IndexError: pass return result def clean(self, maxpos): """ Remove chains that can not be completed. """ result = BigramChain(self.language_plugin) for key, nextkeys in self.items(): for nextkey, frequency in nextkeys.items(): if nextkey in self or nextkey.position == maxpos: result[key][nextkey] = frequency if len(self) == len(result): return result else: return result.clean(maxpos) def generate(self, startkeys=None): if startkeys is None: startkeys = self.startkeys startkeys = list(startkeys.items()) random.shuffle(startkeys) startkeys = dict(startkeys) if len(self) > 0: for key in startkeys: if key not in self: yield (key.value,) else: next_keys = self[key] for result in self.generate(next_keys): yield (key.value,)+result else: raise Exception('LinkError') def display(self): for key, nextkeys in sorted(self.items(), key=lambda x: x): print('***', key.position, key.value) for nextkey, frequency in nextkeys.items(): print(nextkey.value, frequency)
Ancestors
- collections.defaultdict
- builtins.dict
Methods
def attribute_filter(self, reference_sequence, attribute)
-
Expand source code
def attribute_filter(self, reference_sequence, attribute): result = BigramChain(self.language_plugin) if type(reference_sequence[0]) == self.language_plugin.Segment: for key, nextkeys in self.items(): try: if key.value.__getattribute__(attribute) == reference_sequence[ key.position].__getattribute__(attribute): result[key] = self[key] except IndexError: pass else: for key, nextkeys in self.items(): try: if key.value.__getattribute__(attribute) == reference_sequence[key.position]: result[key] = self[key] except IndexError: pass return result
def build_limit_frequencies(self, fields)
-
Expand source code
def build_limit_frequencies(self, fields): limits = defaultdict(dict) for key, nextkeys in self.items(): position, value = key subkey_a = (position, tuple( [value.__getattribute__(field) for field in fields])) for nextkey, frequency in nextkeys.items(): position, value = nextkey subkey_b = (position, tuple( [value.__getattribute__(field) for field in fields])) subkey = (subkey_a, subkey_b) minfrequency = limits[subkey].get('min', frequency) limits[subkey]['min'] = min(minfrequency, frequency) maxfrequency = limits[subkey].get('max', frequency) limits[subkey]['max'] = max(maxfrequency, frequency) self.limit_frequencies[tuple(fields)] = limits
def clean(self, maxpos)
-
Remove chains that can not be completed.
Expand source code
def clean(self, maxpos): """ Remove chains that can not be completed. """ result = BigramChain(self.language_plugin) for key, nextkeys in self.items(): for nextkey, frequency in nextkeys.items(): if nextkey in self or nextkey.position == maxpos: result[key][nextkey] = frequency if len(self) == len(result): return result else: return result.clean(maxpos)
def display(self)
-
Expand source code
def display(self): for key, nextkeys in sorted(self.items(), key=lambda x: x): print('***', key.position, key.value) for nextkey, frequency in nextkeys.items(): print(nextkey.value, frequency)
def frequency_filter(self, reference_sequence, lower, upper, kind='dev')
-
Expand source code
def frequency_filter(self, reference_sequence, lower, upper, kind='dev'): result = BigramChain(self.language_plugin) frequencies = self.get_frequencies(reference_sequence) for key, nextkeys in self.items(): try: if kind == 'dev': minfreq = frequencies[key.position]-lower maxfreq = frequencies[key.position]+upper elif kind == 'limit': minfreq = lower maxfreq = upper except: pass else: for nextkey, frequency in nextkeys.items(): if minfreq <= frequency <= maxfreq: result[key][nextkey] = frequency result = result.clean(len(reference_sequence)-1) result.set_startkeys() return result
def generate(self, startkeys=None)
-
Expand source code
def generate(self, startkeys=None): if startkeys is None: startkeys = self.startkeys startkeys = list(startkeys.items()) random.shuffle(startkeys) startkeys = dict(startkeys) if len(self) > 0: for key in startkeys: if key not in self: yield (key.value,) else: next_keys = self[key] for result in self.generate(next_keys): yield (key.value,)+result else: raise Exception('LinkError')
def get_frequencies(self, reference_sequence)
-
Expand source code
def get_frequencies(self, reference_sequence): frequencies = {} for position in range(len(reference_sequence)-1): key = Link(position, reference_sequence[position]) nextkey = Link(position+1, reference_sequence[position+1]) try: frequency = self[key][nextkey] except KeyError: frequency = 0 frequencies[position] = frequency return frequencies
def load(self, datafile, size=100, cutoff=1, token=False)
-
Expand source code
def load(self, datafile, size=100, cutoff=1, token=False): lines = datafile.readlines() for i, line in enumerate(lines): fields = line.strip('\n\t').split(self.language_plugin.separator) reference, input_sequence, frequency = fields frequency = float(frequency) if token == True else 1 frequency = 1 sequence = (self.language_plugin.transform( input_sequence, frequency)) n = len(sequence.representation) if frequency >= cutoff and random.randint(1, 100) <= size: for j in range(n): key = Link(j, sequence.representation[j]) if j+1 < n: next_key = Link(j+1, sequence.representation[j+1]) self[key][next_key] = self[key].get( next_key, 0)+sequence.frequency else: pass datafile.close() self.set_startkeys()
def segmentset_filter(self, reference_sequence, segmentset)
-
Expand source code
def segmentset_filter(self, reference_sequence, segmentset): segmentset = segmentset.union(set(('^', '$'))) result = BigramChain(self.language_plugin) for key, nextkeys in self.items(): if key.value.letters in segmentset: for nextkey, frequency in nextkeys.items(): if nextkey.value.letters in segmentset: result[key][nextkey] = frequency result = result.clean(len(reference_sequence)-1) result.set_startkeys() return result
def set_startkeys(self, reference_sequence=None, fields=None)
-
Expand source code
def set_startkeys(self, reference_sequence=None, fields=None): if fields == None: fields = self.language_plugin.default_fields if reference_sequence == None: self.startkeys = dict([(key, 0) for key in self.keys() if key.position == 0]) else: self.startkeys = {} for key in self.keys(): if key.position == 0: self.startkeys[key] = 0
class Link (position, value)
-
Link(position, value)
Ancestors
- builtins.tuple
Instance variables
var position
-
Alias for field number 0
var value
-
Alias for field number 1