Word2Vec Embedding Model Source Code

Date: 23.05.16

Writer: 9tailwolf : doryeon514@gm.gist.ac.kr

CBOW by using Pytorch
Initial Skip_gram by using Pytorch
Subsampling
Word2Vec by Pytorch


CBOW by using Pytorch


Library

import torch
import torch.nn as nn
import torch.optim as optim

Continuous Bag-of-Words Model

class CBOW(nn.Module):
    def __init__(self,input_layer, hidden_layer1, hidden_layer2, output_layer):
        super().__init__()
        self.E = nn.Embedding(input_layer, hidden_layer1)
        self.W = nn.Linear(hidden_layer1, output_layer)
        self.b = nn.Parameter(torch.ones(output_layer))
        
        self.softmax = nn.Softmax()
        
    def forward(self, x):
        x = self.E(x)
        x = x.sum(1)
        
        res = self.W(self.softmax(x)) + self.b
        
        return res

Tokenizer Function

def tokenizer(sentence):
    dict_number = {}
    dict_word = {}
    for s in sentence:
        for word in s.split():
            if word not in dict_number.keys():
                dict_number[word] = len(dict_number)
                dict_word[len(dict_number)-1] = word
                
    return dict_number, dict_word

Making Data Function

def make_data(sentence,dict_number):
    X = []
    Y = []
    for sen in sentence:
        s = sen.split()
        X.append([dict_number[w] for w in s[:-1]])
        Y.append(dict_number[s[-1]])
    return X,Y

Main Function : Data

sentences = [
    'i love you',
    'i hate them',
    'i hit ball',
    'i kill mouse',
    'you love me',
    'you hate them',
    'you kill mouse',
    'they like us',
    'they are bad'
]

Main Function

dict_number, dict_word = tokenizer(sentences)
X,Y = make_data(sentences, dict_number)

model = CBOW(len(dict_number),3,3,len(dict_number))

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.1)

epochs = 10000

X,Y = torch.LongTensor(X),torch.LongTensor(Y)

for epoch in range(epochs):
    Y_pred = model(X)
    loss = loss_fn(Y_pred, Y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch + 1)% 1000 == 0:
        print(epoch+1, loss.float())

Test

def test_model(model,dict_number,sentence):
    x = torch.LongTensor([[dict_number[i] for i in sentence.split()]])
    predict = model(x).data.max(1, keepdim=True)[1]
    print(sentence,dict_word[predict.squeeze().item()])

test_model(model,dict_number,'you love')
# >>> you love me


Initial Skip_gram by using Pytorch


Library

import torch
import torch.nn as nn
import torch.optim as optim
import nltk
nltk.download('book', quiet=True)
from nltk.book import *

Skip-gram Model

class Skip_Gram(nn.Module):
    def __init__(self,input_layer, hidden_layer1, output_layer):
        super().__init__()
        self.E = nn.Embedding(input_layer, hidden_layer1)
        self.W = nn.Linear(hidden_layer1, output_layer)
        
        self.softmax = nn.Softmax()
        
    def forward(self, x):
        x = self.E(x)
        res = self.W(self.softmax(x))
        return res

Tokenizer Function

def tokenizer(sentence):
    dict_number = {}
    dict_word = {}
    for s in sentence:
        for word in s.split():
            if word not in dict_number.keys():
                dict_number[word] = len(dict_number)
                dict_word[len(dict_number)-1] = word
                
    return dict_number, dict_word

Making Data Function

def make_data(sentence,dict_number,num): # num is a parameter that defind num-gram model.
    X = []
    Y = []
    for sen in sentence:
        s = list(sen.split())
        for i in range(1,num+1):
            for wi in range(len(s)-i):
                X.append(dict_number[s[wi]])
                Y.append(dict_number[s[wi+i]])
                X.append(dict_number[s[wi+i]])
                Y.append(dict_number[s[wi]])    
    return X,Y

Main Function : Data

emma_raw = nltk.corpus.gutenberg.raw('austen-emma.txt')
new_s = ""
for s in emma_raw:
    if s not in '".,_;?!1234567890[]-':
        new_s+=s
sentence = list(new_s.split('\n'))[:1000]

Main Function

dict_number, dict_word = tokenizer(sentence)
X,Y = make_data(sentence, dict_number,3)
model = Skip_Gram(len(dict_number),10,len(dict_number))

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.03)

epochs = 1000

X,Y = torch.LongTensor(X),torch.LongTensor(Y)

for epoch in range(epochs):
    Y_pred = model(X)
    loss = loss_fn(Y_pred, Y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 10==0:
        print(epoch+1, loss.float())

Test

def test_model(model,dict_number,word):
    x = torch.LongTensor([dict_number[word]])
    predict = model(x).data.max(1, keepdim=True)[1]
    print(word,dict_word[predict.squeeze().item()])

test_model(model,dict_number,'enough')
# >>> enough and


Subsampling


Library

!pip install d2l==1.0.0-alpha1.post0
import collections
import math
import os
import random
import torch
import matplotlib.pyplot as plt
from d2l import torch as d2l

Read Data

d2l.DATA_HUB['ptb'] = (d2l.DATA_URL + 'ptb.zip',
                       '319d85e578af0cdc590547f26231e4e31cdf1e42')

def read_ptb():
    """Load the PTB dataset into a list of text lines."""
    data_dir = d2l.download_extract('ptb')
    # Read the training set
    with open(os.path.join(data_dir, 'ptb.train.txt')) as f:
        raw_text = f.read()
    return [line.split() for line in raw_text.split('\n')]

sentences = read_ptb()
sentences = [[word for word in sentence if word!='<unk>'] for sentence in sentences]

Subsampling

$$P(w_i) = \max\left(1 - \sqrt{\frac{t}{f(w_i)}}, 0\right)$$
def subsampling(sentences):
    counter = collections.Counter([word for sentence in sentences for word in sentence])
    words_number = sum(counter.values())
    
    def probability(word,t=1e-4):
        return random.random() > 1 - math.sqrt(t / counter[word] * words_number)
    
    subsampled_sentences = [[word for word in sentence if probability(word)] for sentence in sentences ]
    subsampled_counter = collections.Counter([word for sentence in subsampled_sentences for word in sentence])
    
    return subsampled_sentences, subsampled_counter, counter

subsampled_sentences, subsampled_counter, counter = subsampling(sentences)

Data Visualization

data = [(counter[word],subsampled_counter[word]) for word in counter]
data.sort(key = lambda x:x[0])
X,Y = [i[0] for i in data], [i[1] for i in data]

plt.scatter(X,Y,c='blue')
plt.xlabel('Vannila Word Frequency')
plt.ylabel('Subsampled Word Frequency')
plt.title('Subsampled Word Frequency by Vannila Word Frequency')
plt.show()


Word2Vec by Pytorch


Library and Data

!pip install d2l==1.0.0-alpha1.post0
from d2l import torch as d2l
import os

d2l.DATA_HUB['ptb'] = (d2l.DATA_URL + 'ptb.zip',
                       '319d85e578af0cdc590547f26231e4e31cdf1e42')
def read_ptb():
    """Load the PTB dataset into a list of text lines."""
    data_dir = d2l.download_extract('ptb')
    # Read the training set
    with open(os.path.join(data_dir, 'ptb.train.txt')) as f:
        raw_text = f.read()
    return [line.split() for line in raw_text.split('\n')]
  
sentences = read_ptb()

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import math
import collections

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

Skip-gram Model

class Skip_Gram(nn.Module):
    def __init__(self,input_layer, hidden_layer1,hidden_layer2):
        super().__init__()
        self.hidden_layer2 = hidden_layer2
        self.E = nn.Embedding(input_layer, hidden_layer1)
        self.W = nn.Linear(hidden_layer1, hidden_layer2)
        self.Q = nn.Linear(hidden_layer2, 1)
        
        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
    def forward(self, x):
        x = self.E(x)
        x = x.sum(1)
        x = self.relu(self.W(x).view(-1, self.hidden_layer2))
        x = self.Q(x).view(-1)
        return self.sigmoid(x)

Tokenizer Function

def tokenizer(sentence):
    dict_number = {}
    dict_word = {}
    for s in sentence:
        for word in s:
            if word not in dict_number.keys():
                dict_number[word] = len(dict_number)
                dict_word[len(dict_number)-1] = word
                
    return dict_number, dict_word

Making Data Function

def make_data(sentence,dict_number,num): # num is a parameter that defind num-gram model.
    data_dict = {i:set() for i in dict_number.values()}
    X = []
    Y = []
    for s in sentence:
        for i in range(1,num+1):
            for wi in range(len(s)-i):
                if dict_number[s[wi]] not in data_dict[dict_number[s[wi+i]]]:
                  X.append([dict_number[s[wi]], dict_number[s[wi+i]]])
                  Y.append(1)
                  data_dict[dict_number[s[wi]]].add(dict_number[s[wi+i]])
                  data_dict[dict_number[s[wi+i]]].add(dict_number[s[wi]])
    d_size = len(X)*2
    while len(X)<d_size:
        w1,w2 = random.randrange(0,len(dict_number)),random.randrange(0,len(dict_number))
        if w1 not in data_dict[w2]:
            X.append([w1,w2])
            Y.append(0)

    return X,Y,data_dict

Subsampling Function

def subsampling(sentences):
    counter = collections.Counter([word for sentence in sentences for word in sentence])
    words_number = sum(counter.values())
    
    def probability(word,t=1e-4):
        return random.random() > 1 - math.sqrt(t / counter[word] * words_number)
    
    subsampled_sentences = [[word for word in sentence if probability(word)] for sentence in sentences ]
    subsampled_counter = collections.Counter([word for sentence in subsampled_sentences for word in sentence])
    
    return subsampled_sentences, subsampled_counter, counter

sentence, subsampled_counter, counter = subsampling(sentences)

Main Function

dict_number, dict_word = tokenizer(sentence)
X,Y,dict_check = make_data(sentence, dict_number, 3)
model = Skip_Gram(len(dict_number),100,100)
model = model.to(device)
loss_fn = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.03)

X = torch.LongTensor(X).to(device)
Y = torch.LongTensor(Y).to(device)
epochs = 1000

for epoch in range(epochs):
    Y_pred = model(X)
    loss = loss_fn(Y_pred.to(torch.float32), Y.to(torch.float32))
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 10==0:
        print(epoch+1, loss.float())

Test

def kNN(model,word,k):
    words = [(math.inf, None) for _ in range(k)]
    t = torch.LongTensor([dict_number[word]]).to(device)
    t = model.E(t)
    for w in dict_number.keys():
        if w != word:
            T = torch.LongTensor([dict_number[w]]).to(device)
            T = model.E(T)
            T = (T - t).tolist()
            res = 0
            for i in T[0]:
               res += i**2
            res **= 0.5
            if words[-1][0]>res:
                words.append((res,w))
                words.sort(key = lambda x:x[0])
                if len(words)>k:
                    words.pop()

    return words