1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] How can I group a spark dataframe into rows no more than 50,000 and no more than 90mb...

Discussão em 'Python' iniciado por Stack, Outubro 1, 2024 às 03:23.

  1. Stack

    Stack Membro Participativo

    How can I group a spark dataframe into rows no more than 50,000 and no more than 90mb in size.

    I've tried the following but some how occasionally I get partitions greater than 90mb.

    from pyspark.sql.window import Window
    from pyspark.sql.functions import expr, length, sum as sql_sum, row_number, col, count

    PARTITION_MB = 90
    ROW_LIMIT = 50000

    try:
    # Select required columns
    sdf = spark.table("table_name")

    # Calculate total memory usage per row in MB
    sdf = sdf.withColumn("json_string", expr("to_json(struct(*))")) \
    .withColumn("memory_usage_per_row_MB", length("json_string") / 1024 / 1024) \
    .drop("json_string")

    # Generate row_number for ordering purposes
    window_spec = Window.orderBy(expr("monotonically_increasing_id()"))

    # Add row_number column to keep track of row order
    sdf = sdf.withColumn("row_number", row_number().over(window_spec))

    # Calculate cumulative memory usage
    sdf = sdf.withColumn("cumulative_memory_MB", sql_sum("memory_usage_per_row_MB").over(window_spec)) \
    .withColumn("cumulative_row_count", row_number().over(window_spec))

    # Assign partition id based on memory and row limits
    sdf = sdf.withColumn(
    "partition_id",
    expr(f"""
    greatest(
    floor(cumulative_memory_MB / {PARTITION_MB}),
    floor((row_number - 1) / {ROW_LIMIT})
    )
    """)
    )

    # Validate partitions
    partition_counts = sdf.groupBy("partition_id").agg(
    sql_sum("memory_usage_per_row_MB").alias("partition_memory_MB"),
    count("*").alias("row_count")
    )

    # Count the number of distinct partitions
    num_partitions = sdf.select("partition_id").distinct().count()

    # Assert that all partitions meet both memory and row constraints
    assert partition_counts.filter(col("partition_memory_MB") <= PARTITION_MB).count() == partition_counts.count(), \
    f"Error: Some partitions exceed {PARTITION_MB} MB"

    assert partition_counts.filter(col("row_count") <= ROW_LIMIT).count() == partition_counts.count(), \
    f"Error: Some partitions exceed {ROW_LIMIT} rows"

    logger.info(f"Successfully partitioned dataframe by memory into {num_partitions} partitions each less than {PARTITION_MB} MB and less than {ROW_LIMIT} rows.")

    except Exception as e:
    logger.error(f"Error partitioning dataframe by memory: {e}")
    raise Exception(e)

    Continue reading...

Compartilhe esta Página