1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] Parallel processing does not work in my task

Discussão em 'Python' iniciado por Stack, Outubro 7, 2024 às 04:42.

  1. Stack

    Stack Membro Participativo

    I use parallel processing in this task, also I use pqdm package to show the progress. However, it seems the processing is not working even though I can make sure the cpu utilization is quite high (90%) since I use 60 processors. Is there any wrong with my code?

    I tried to figure out whether the pqdm impede the parallel processing or it is running well, just the pqdm can not show the progress bar. Finally I find out that actually the parallel processing is not working. If I use the version of code using just 1 core, it is running slowly but smoothly.


    import pandas as pd
    import sqlite3
    from fuzzywuzzy import fuzz
    import time
    import os
    import multiprocessing
    from pqdm.processes import pqdm
    from functools import partial
    def download_and_process_geonames():
    # Download the GeoNames data for cities with population > 1000
    url = "http://download.geonames.org/export/dump/cities1000.zip"
    try:
    response = requests.get(url)
    response.raise_for_status()
    except requests.exceptions.RequestException as e:
    print(f"Error downloading the file: {e}")
    return

    try:
    z = zipfile.ZipFile(io.BytesIO(response.content))
    except zipfile.BadZipFile:
    print("The downloaded file is not a valid zip file.")
    return

    # Extract the file
    try:
    z.extractall()
    except Exception as e:
    print(f"Error extracting the zip file: {e}")
    return

    # Check if the file exists
    if not os.path.exists('cities1000.txt'):
    print("The extracted file 'cities1000.txt' does not exist.")
    return

    # Read the extracted file
    try:
    df = pd.read_csv('cities1000.txt', sep='\t', header=None,
    names=['geonameid', 'name', 'asciiname', 'alternatenames', 'latitude', 'longitude',
    'feature_class', 'feature_code', 'country_code', 'cc2', 'admin1_code',
    'admin2_code', 'admin3_code', 'admin4_code', 'population', 'elevation',
    'dem', 'timezone', 'modification_date'])
    except Exception as e:
    print(f"Error reading the CSV file: {e}")
    return

    # Filter for European countries (you may need to update this list)
    european_countries = ['AD', 'AL', 'AT', 'BA', 'BE', 'BG', 'BY', 'CH', 'CY', 'CZ', 'DE', 'DK', 'EE', 'ES', 'FI', 'FR', 'GB', 'GR', 'HR', 'HU', 'IE', 'IS', 'IT', 'LI', 'LT', 'LU', 'LV', 'MC', 'MD', 'ME', 'MK', 'MT', 'NL', 'NO', 'PL', 'PT', 'RO', 'RS', 'RU', 'SE', 'SI', 'SK', 'SM', 'UA', 'VA', 'XK']
    european_cities = df[df['country_code'].isin(european_countries)]

    # Create SQLite database
    try:
    conn = sqlite3.connect('europe_cities.db')

    # Write to database
    european_cities[['name', 'country_code', 'latitude', 'longitude']].to_sql('cities', conn, if_exists='replace', index=False)

    conn.close()
    except Exception as e:
    print(f"Error creating or writing to the database: {e}")
    return

    # Clean up
    try:
    os.remove('cities1000.txt')
    except Exception as e:
    print(f"Error removing the temporary file: {e}")

    print("Database created successfully!")

    # Run the setup
    download_and_process_geonames()

    def get_coordinates(city_name, country_code, db_path):
    if pd.isna(city_name) or pd.isna(country_code) or city_name is None or country_code is None:
    return None, None, None, "Missing or invalid city name or country code"

    try:
    with sqlite3.connect(db_path) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT name, latitude, longitude FROM cities WHERE country_code = ?", (country_code,))
    cities = cursor.fetchall()

    if not cities:
    return None, None, None, f"No cities found for country code: {country_code}"

    best_match = max(cities, key=lambda x: fuzz.ratio(str(city_name).lower(), str(x[0]).lower()))

    if fuzz.ratio(str(city_name).lower(), str(best_match[0]).lower()) < 50: # You can adjust this threshold
    return None, None, None, f"No close match found for city: {city_name}"

    return best_match[1], best_match[2], best_match[0], None
    except Exception as e:
    return None, None, None, f"Error processing {city_name}, {country_code}: {str(e)}"

    def process_inventor(row, db_path):
    lat, lon, matched_city, reason = get_coordinates(row['inventor_city_name'], row['inventor_country_code'], db_path)
    return {**row.to_dict(), 'latitude': lat, 'longitude': lon,
    'matched_city': matched_city, 'matched_country_code': row['inventor_country_code'],
    'geocoding_reason': reason}

    def process_inventors(df, db_path, checkpoint_file):
    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
    processed_df = pd.read_csv(checkpoint_file)
    processed_ids = set(processed_df['inventor_id'])
    df = df[~df['inventor_id'].isin(processed_ids)]
    else:
    processed_df = pd.DataFrame()

    # Prepare the partial function for multiprocessing
    process_inventor_partial = partial(process_inventor, db_path=db_path)

    # Use pqdm for multiprocessing with progress bar
    results = pqdm(df.to_dict('records'), process_inventor_partial, n_jobs=60, desc="Processing inventors")

    # Combine results
    new_df = pd.DataFrame(results)
    processed_df = pd.concat([processed_df, new_df], ignore_index=True)

    return processed_df

    if __name__ == '__main__':
    start_time = time.time()

    # Load your data
    db_path = 'europe_cities.db' # Path to your SQLite database
    checkpoint_file = 'geocoding_checkpoint.csv'

    # Load your unique_inventors DataFrame here
    # unique_inventors = pd.read_csv('your_input_file.csv') # Uncomment and modify this line

    unique_inventors_processed = process_inventors(unique_inventors, db_path, checkpoint_file)

    # Save the final result
    unique_inventors_processed.to_csv('unique_inventors_processed_final.csv', index=False)

    end_time = time.time()
    processing_time = end_time - start_time

    print(f"Processing completed in {processing_time:.2f} seconds.")
    print(f"Total records processed: {len(unique_inventors_processed)}")
    print(f"Records with coordinates: {len(unique_inventors_processed[unique_inventors_processed['latitude'].notna()])}")
    print(f"Records without coordinates: {len(unique_inventors_processed[unique_inventors_processed['latitude'].isna()])}")
    print("The processed data has been saved to 'unique_inventors_processed_final.csv'.")

    Continue reading...

Compartilhe esta Página