1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] Fast API with asyncio | Running long running background tasks

Discussão em 'Python' iniciado por Stack, Outubro 7, 2024 às 02:53.

  1. Stack

    Stack Membro Participativo

    I'm running a heavy async task in FastAPI and need to return a response to the user while continuing the task in the background with asyncio. When I await the task, it works perfectly, but when I add it to the task loop, it randomly stops executing.

    Any insights on best practices for handling this?

    End to end function with abstractions below

    async def from_template_to_conv(
    transaction_id: str,
    db,
    ):
    print("Getting templates")
    transaction = await fetch_transaction_with_workstreams(db, transaction_id)
    if len(transaction.workstreams) > 0:
    print("Transaction already has workstreams. Aborting.")
    return

    # If transaction is not found, raise a 404 error
    print("Creating workstreams")
    response = await process_transaction_based_on_template(db, transaction=transaction)
    print("Workstream created successfully.")

    questions = await fetch_transaction_questions(db, transaction_id)
    question_dics = [
    {
    "id": q.id,
    "content": q.content,
    }
    for q in questions
    ]
    executor = ThreadPoolExecutor(max_workers=1)

    # Define the run_heavy_task function to process all questions
    async def run_heavy_task(transaction_id, questions, db, executor):

    for question in questions:
    transaction = await get_transaction(db, transaction_id)
    print(f"Fetched transaction {transaction_id}.")

    users = await get_users_by_organization(db, transaction.organization_id)
    print(f"Fetched users for transaction {transaction_id}.")
    user_ids = [u.id for u in users]
    print(user_ids)
    print(f"Automating questionnaire for question {question['id']}.")
    conversation = await create_automation_conversation(
    db, user_ids, "deals", transaction.organization_id, transaction
    )
    print(f"Created conversation for transaction {transaction_id}.")

    response = await process_message_conversation(
    question["id"],
    conversation,
    conversation.id,
    question["content"],
    user_ids,
    db,
    )
    print(f"Processed message for question {question['id']}.")

    final_message = None
    async for message in response.body_iterator:
    final_message = message

    if final_message:
    print(f"Final message received for question {question['id']}: {schema.Message.parse_raw(final_message)}")
    else:
    raise HTTPException(status_code=500, detail="Internal server error")

    # Create a single task to process all questions
    task = asyncio.create_task(run_heavy_task(transaction_id, question_dics, db, executor))

    # await task # Ensure the task is awaited and executed

    return response

    Continue reading...

Compartilhe esta Página