1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] Cosine similarity matrix with Pyspark MS Fabric SparkJob

Discussão em 'Python' iniciado por Stack, Outubro 5, 2024 às 21:32.

  1. Stack

    Stack Membro Participativo

    I am having some issues while computing some cosine similarities for a product recommender. I have an article database containing 40k articles each of them with their description. I am trying to calculate the cosine similarity matrix for this elements so that when any article is given the top N most similar according to the description can be retrieved.

    I am developing this in Python and the code should be run in a Microsoft Azure Function. As you can imagine already given the size of the initial df this cannot be run on this platform (It runs out of memory as the cosine similarity that is outputted is more 8 GB in size alone).

    With this is mind I have decided to take another approach, which is using Spark, more accurately, a Spark Job definition from Microsoft Fabric. I have been reviewing similar questions (as my Pyspark knowledge is extremely limited) like this one: Calculating cosine similarity in Pyspark Dataframe, but I'm struggling to make the code work in my situation.

    Trying to tackle problems one by one I have realized my code doesn't even get to the cosine similarity calculation (or at least not all of it) as my spark job gets frozen when attempting the crossjoin of matrices and I am unable to check what the exact issue is from the stderr logs.

    I know my starting df of 40k rows is quite big and that this implies a 40k x 40k cos sim matrix but I tried this operation with plain python on a Google Collab notebook and it was able to calculate it in around 10 mins (It is true that I had to use the TPU backend as the normal CPU one also crashes after consuming the standard memory -12GB-). At this point I'm thinking either:

    -MS Fabric SparkJobs are also not a valid tool for my needs here as they offer a "weak/capped" version of spark.

    -My Pyspark code clearly has something that is incorrect and causing the Spark executor to be decommissioned wrongly.

    I need some help finding in which scenario I am (and if on the second how to improve the code).

    Let me share some code. This is the Python code that for the same task actually works on a TPU Collab notebook (using sklearn imports):

    articulos = articulos[['article_id','product_code', 'detail_desc']]
    articulos_unicos = articulos.drop_duplicates(subset=['product_code'])
    articulos_final = articulos_unicos.dropna(subset=['detail_desc'])
    articulos_final = articulos_final.reset_index(drop=True)

    count = CountVectorizer(stop_words='english')
    count_matrix = count.fit_transform(articulos_final['detail_desc'])
    count_matrix = count_matrix.astype(np.float32)

    similitud_coseno = cosine_similarity(count_matrix, count_matrix)


    np.fill_diagonal(similitud_coseno, -1)

    top_5_simart = []


    for i in range(similitud_coseno.shape[0]):
    top_indices = np.argpartition(similitud_coseno, -5)[-5:]
    sorted_top_indices = top_indices[np.argsort(-similitud_coseno[i, top_indices])]
    top_5_simart.append(sorted_top_indices.tolist())

    with open('top_5_simart.json', 'w') as f:
    json.dump(top_5_simart, f)


    On the other hand this is the Pyspark code I am trying to implement:

    # Selecting the necessary columns
    articulos = articulos.select("article_id", "product_code", "detail_desc")

    # Removing duplicates
    articulos = articulos.dropDuplicates(["product_code"]).dropna(subset=["detail_desc"])

    # Tokenizing the description text
    tokenizer = Tokenizer(inputCol="detail_desc", outputCol="words")
    articulos_tokenized = tokenizer.transform(articulos)

    # Removing stopwords
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    articulos_clean = remover.transform(articulos_tokenized)

    # Generating the feature column with CountVectorizer
    vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")
    vectorizer_model = vectorizer.fit(articulos_clean)
    articulos_final = vectorizer_model.transform(articulos_clean)
    articulos_final = articulos_final.select("article_id","features")
    print(articulos_final.dtypes)
    print(articulos_final.schema)

    # Using crossJoin to obatain all the pairwise values for the cosine sim
    articulos_final2 = articulos_final.withColumnRenamed("article_id","article_id2").withColumnRenamed("features","features2")
    articulos_final_cos_sim = articulos_final.crossJoin(articulos_final2)
    articulos_final.unpersist()
    articulos_final2.unpersist()
    articulos_final_cos_sim.write.mode('overwrite').format('delta').save(cspath)

    '''
    # Realizar el crossJoin para obtener todas las combinaciones de pares de filas
    articulos_final_cos_sim = (
    articulos_final.alias("a")
    .crossJoin(articulos_final.alias("b"))
    .withColumn(
    "dot_product",
    F.col("a.features").dot(F.col("b.features")) # Producto punto de los vectores
    )
    .withColumn(
    "norm_a",
    F.expr("a.features.norm(2)") # Norma L2 del vector 'a'
    )
    .withColumn(
    "norm_b",
    F.expr("b.features.norm(2)") # Norma L2 del vector 'b'
    )
    .withColumn(
    "cosine_similarity",
    F.col("dot_product") / (F.col("norm_a") * F.col("norm_b")) # Similitud coseno
    )
    )

    # Eliminar las columnas innecesarias
    articulos_final_cos_sim = articulos_final_cos_sim.drop("dot_product", "norm_a", "norm_b")

    # Agrupar para construir la matriz de similitud
    articulos_final_cos_sim = articulos_final_cos_sim.groupBy("a.article_id").pivot("b.article_id").sum("cosine_similarity")

    # Guardar los resultados
    articulos_final_cos_sim.write.mode('overwrite').format('delta').save(testpath2)



    # Filtrar para obtener los 5 artículos más similares para cada uno
    windowSpec = Window.partitionBy("article_id").orderBy(F.col("cosine_similarity").desc())

    top_5_simart = articulos_final_cos_sim.withColumn("rank", F.row_number().over(windowSpec)).filter(F.col("rank") <= 5)

    # Guardar los resultados en formato JSON
    top_5_simart.write.mode('overwrite').json(Top5SimartPath)
    '''



    As you can see the last part of the code is commented since (after a lot of attempts) this code didn't give me any errors that made the execution fail, it simply took forever to execute. The active part of the code only does the cross join but doesn't do any of the calculations yes, as I mentioned it is stuck forever.

    If needed I can also provide the stderror logs of Spark.

    Can I get some help with this? Thanks in advance!

    Continue reading...

Compartilhe esta Página