1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
|
import random
import time
import math
import torch
import torch.nn as nn
import warnings
import sys
import copy
from util import AtomicNumber
PRINT_INFORMATION_SECONDS = 2
num_processes = 12
#ignore warnings
warnings.filterwarnings('ignore')
if "--disable-cuda" in sys.argv:
cuda = False
else:
cuda = torch.cuda.is_available()
print(f"CUDA is {'enabled' if cuda else 'disabled'}")
if cuda:
print("CUDA devices:")
for device_index in range(torch.cuda.device_count()):
print(f"{device_index}|\t{torch.cuda.get_device_name(device_index)}")
device = torch.device("cuda") if cuda else torch.device("cpu")
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
# create the input, hidden and output linear transformation branches
self.input_to_hidden = nn.Linear(input_size + hidden_size, hidden_size, device=device)
self.input_to_output = nn.Linear(input_size + hidden_size, output_size, device=device)
self.output_to_output = nn.Linear(hidden_size + output_size, output_size, device=device)
# initialise a dropout function to be used on output data
self.dropout = nn.Dropout(0.1)
# initialise the softmax function to be used on output data
self.softmax = nn.LogSoftmax(dim=1)
# do not introduce any randomness by default
self.random_factor = 0
def forward(self, inputs, hidden):
# combine the input layer with the hidden layer to create the output layer and new hidden layer
input_combined = torch.cat((inputs, hidden), 1)
hidden = self.input_to_hidden(input_combined)
output = self.input_to_output(input_combined)
output_combined = torch.cat((hidden, output), 1)
output = self.output_to_output(output_combined)
# apply the functions to the output data
output = self.dropout(output)
output = self.softmax(output)
# add noise to the output, based on self.random_factor
if self.random_factor > 0:
# create a fully random tensor
random_tensor = torch.randn(self.output_size)
output = torch.add(output, random_tensor, alpha=self.random_factor)
return output, hidden
def initHidden(self):
# The hidden layer should be tensor with the length that we've specified
return torch.zeros(1, self.hidden_size, device=device)
# instantiate the function to use to calculate loss
# we will use Mean Squared Error between the
criterion = nn.NLLLoss()
# define the learning rate, to begin with, we can use 0.0005
learning_rate = 0.0005
"""Train a neural network on a single input name
Args:
rnn: (RNN) the rnn to train
input_tensors: (tensor) The input tensor: a one-hot-encoding from the first letter to the last letter, excluding the end of string marker
output_tensors: (tensor) The input tensor: a one-hot-encoding from the second letter to the end of the input data
Returns:
output: (tensor) the output of the training
loss: (float) the loss of the training
"""
def train_rnn(rnn, input_tensor, target_tensor):
# unsqueeze the target tensor,
target_tensor.unsqueeze_(-1)
# reset the parameters of the neural network
hidden = rnn.initHidden()
rnn.zero_grad()
# initiate an float called loss, this will store the error between each iteration output and its target
loss = 0
for i in range(input_tensor.size(0)):
output, hidden = rnn(input_tensor[i], hidden)
# calculate the error and add it to the overall loss
l = criterion(output, target_tensor[i])
loss += l
loss.backward()
# adjust the parameters of the rnn accordingly
for p in rnn.parameters():
p.data.add_(-learning_rate, p.grad.data)
return output, loss.item() / input_tensor.size(0)
"""Create the input tensor for a name, a one hot matrix from the first letter to last letter (excluding EOS)
Args:
name: (str[]) an array of the letters in the name, can also be supplied as a string literal
alphabet: (str[]) The alphabet to use while encoding the name, an array starting with a "NULL" character and ending in an "EOS" character
value: (float) (default=1) The value to use for the "1" representing the letter
Returns:
tensor: (tensor) the input tensor for the given name
"""
def input_tensor(name, alphabet, value=1):
tensor = torch.zeros(len(name), 1, len(alphabet), device=device)
#iterate through each letter in the name
for li in range(len(name)):
letter = name[li]
# If the letter isn't in the alphabet, use the first "NULL" character
index = alphabet.index(letter) if letter in alphabet else 0
tensor[li][0][index] = value
return tensor
"""Create the target tensor for a name, a long tensor from the second letter to the EOS
Args:
name: (str[]) an array of the letters in the name, can also be supplied as a string literal
alphabet: (str[]) The alphabet to use while encoding the name, an array starting with a "NULL" character and ending in an "EOS" character
Returns:
tensor: (tensor) the input tensor for the given name
"""
def target_tensor(name, alphabet):
indexes = []
for li in range(1, len(name)):
letter = name[li]
index = alphabet.index(letter) if letter in alphabet else 0
indexes.append(index)
# and add the end of string character
indexes.append(len(alphabet) - 1)
#legacy tensor needs to be made this way
if cuda:
return torch.cuda.LongTensor(indexes)
else:
return torch.LongTensor(indexes)
"""Train a neural network on a list of names with a given alphabet
Args:
rnn (RNN): the neural network to train on
names: (str[]) the list of names to train on
alphabet: (str[]) the alphabet to use to encode characters
iterations: (int) (default=10000) The number of iterations of training that should be done
"""
def learn_names(rnn, names, alphabet, iterations=100000, num_processes=12):
# keep track of total time spent training by knowing when we started training
start = time.time()
# define the number of iterations per process
iters_per_process = int(iterations/num_processes)
processes = []
# keep track of the total loss
total_loss = AtomicNumber()
# keep track of total number of completed iterations
completed_iterations = AtomicNumber()
# keep track of the last time that the information was printed
# this way we can print every x seconds
last_print = AtomicNumber()
print(f"Training on {len(names)} names...")
# spawn processes, each running the _train function
torch.multiprocessing.spawn(_train, args=(rnn, names, alphabet, iters_per_process,
total_loss, completed_iterations, last_print, start, iterations),
nprocs=num_processes,
join=True)
print()
"""Thread function to use when multiprocessing learn_names
"""
def _train(rank, rnn, names, alphabet, iterations,
total_loss, completed_iterations, last_print,
start, total_iterations):
for i in range(1, iterations+1):
try:
# choose a random name to train on
name = random.choice(names)
# create the input and trainint tensors
input_name_tensor = input_tensor(name, alphabet)
target_name_tensor = target_tensor(name, alphabet)
# train the rnn on the input and target tensors
output, loss = train_rnn(rnn, input_name_tensor, target_name_tensor)
total_loss.increment(loss)
# increment number of completed iterations
completed_iterations.increment()
# to prevent overloading the console, potentially slowing down the training process,
# only print information every PRINT_INFORMATION_SECONDS
if time.time() - last_print.get() > PRINT_INFORMATION_SECONDS:
# set last print to now to prevent other threads from also printing
last_print.set(time.time())
# calculate and display information
seconds_elapsed = time.time() - start
time_elapsed = "%dm %ds" % (math.floor(seconds_elapsed / 60), seconds_elapsed % 60)
percentage = completed_iterations.get() / total_iterations * 100
# print information on the same line as before
print("\r%s (%d %d%%) %.4f" % (time_elapsed, completed_iterations.get(), percentage, loss), end="")
except:
pass
"""Sample a random name from the network using a starting letter
Args:
rnn: (RNN) the neural network to sample from
alphabet: (str[]) the alphabet to use to decode the outputs from the network
start_letter: (str) the letter to use to start the neural network
max_length: (int) (default=50) the maximum length for a name
Returns:
output_name: (str) the characters that the rnn has generated from the starting letter
"""
def sample(rnn, alphabet, start_letter, max_length=50):
# disable gradient calculation
#with torch.no_grad():
# create the input tensor from the start letter, using a randomized value
#random_value = random.random()
random_value = 1
sample_input = input_tensor(start_letter, alphabet, value=random_value)
rnn.dropout(sample_input)
# reset hidden layer
hidden = rnn.initHidden()
output_name = [start_letter]
# use a max length to prevent names from being too long
for i in range(max_length):
# call the rnn for the next letter
output, hidden = rnn(sample_input[0], hidden)
top_v, top_i = output.topk(1)
top_i = top_i[0][0]
if top_i == len(alphabet)-1: # EOS has been reached
break;
else:
# append next letter to output
letter = alphabet[top_i]
output_name.append(letter)
sample_input = input_tensor(letter, alphabet)
return output_name
import warnings
# testing
if __name__ == "__main__":
with warnings.catch_warnings():
warnings.simplefilter("ignore")
english_alphabet = [c for c in " abcdefghijklmnopqrstuvwxyz"]
english_alphabet.append("") # add the EOS character
option = input("(t)rain or (s)ample?")
if option == "t":
names = []
with open("data/datasets/usa/surname.txt", "r") as datafile:
# convert all names to lowercase and remove newline character
names = [name[:-1].lower() for name in datafile.readlines()]
# create the neural network with a hidden layer of size 128
rnn = RNN(len(english_alphabet), 128, len(english_alphabet))
# transfer to cuda if cuda is enabled
if cuda:
rnn.cuda()
def provide_name():
return random.choice(names)
learn_names(rnn, names, english_alphabet, iterations=100000, num_processes=12)
print()
torch.save(rnn, "data/english_names.pt")
elif option == "s":
rnn = torch.load("data/english_names.pt")
if cuda:
rnn.cuda()
rnn.eval()
rnn.random_factor = 0.7
for start_letter in [i for i in "abcdefghijklmnopqrstuvwxyz"]:
print(sample(rnn, english_alphabet, start_letter))
else:
print("invalid option!")
|