5

I want to use pretrained embedding model (fasttext) in a pyspark application.

So if I broadcast the file (.bin), the following exception is thrown: Traceback (most recent call last):

cPickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 2 GiB

Instead, I tried to use sc.addFile(modelpath) where modelpath=path/to/model.bin as following:

i create a file called fasttextSpark.py

import gensim
from gensim.models.fasttext import FastText as FT_gensim
# Load model (loads when this library is being imported)
model = FT_gensim.load_fasttext_format("/project/6008168/bib/wiki.en.bin")

# This is the function we use in UDF to predict the language of a given msg
def get_vector(msg):
    pred = model[msg]
    return pred

and testSubmit.sh:

#!/bin/bash
#SBATCH -N 2
#SBATCH -t 00:10:00
#SBATCH --mem 20000
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 32
module load python/2.7.14
source "/project/6008168/bib/ENV2.7.14/bin/activate"
module load spark/2.3.0
spark-submit /project/6008168/bib/test.py

and the test.py:

from __future__ import print_function
import sys
import time
import math
import csv
import datetime
import StringIO
import pyspark
import gensim
from operator import add
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from gensim.models.fasttext import FastText as FT_gensim
appName = "bib"
modelpath = "/project/6008168/bib/wiki.en.bin"
conf = (SparkConf()
         .setAppName(appName)
         .set("spark.executor.memory", "12G")
         .set("spark.network.timeout", "800s")
         .set("spark.executor.heartbeatInterval", "20s")
         .set("spark.driver.maxResultSize", "12g")
         .set("spark.executor.instances", 2)
         .set("spark.executor.cores", 30)
         )
sc = SparkContext(conf = conf)
#model = FT_gensim.load_fasttext_format(modelpath)
sc.addFile(modelpath)
sc.addPyFile("/project/6008168/bib/fasttextSpark.py")

# Import our custom fastText language classifier lib
import fasttextSpark
print ("nights = ", fasttextSpark.get_vector("nights"))
print ("done")

Now, each node will have a copy of the pretrained dataset. Some words are out of vocabulary so each time I'm facing such words I want to create for it a random but fixed vector and add the word and its vector to a dictionary.

So, How I can maintain such a dictionary in each node?

Indeed, suppose my rdd is as following my_rdd = (id, sentence) and I want to find the embedding vector of the sentence by summing up the vectors of its words. How many times the embedding model will be loaded. For example:

suppose rdd=("id1", "motorcycle parts"), does my implementation load the model two times: one for motorcycle and one for parts? if yes, my approach is inefficacce? In this case what it should be the best approaches to be applied?

0

1 Answer 1

1

Module variables in Python are evaluated once, when the module is loaded. So the variable will be loaded once per interpreter and kept alive as long as the interpreter is kept alive.

However Spark worker processes don't share memory, so there will be one copy of the dictionary per worker process. The same would be true if you had a broadcast variable.

So your current solution is as close to what you want as you can get, without using low level primitives (like memory mapping) or external storage.

Sign up to request clarification or add additional context in comments.

3 Comments

but please my solution load the model for every entry in my_rdd. I want to avoid that and load the model once
@bib What makes you think that?
because each time i want to get the embedding vector of a word, the line code=... will be executed

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.