1. Anuncie Aqui ! Entre em contato fdantas@4each.com.br

[Python] Issue With Regards to Speed of Repeated Backwards A*

Discussão em 'Python' iniciado por Stack, Outubro 1, 2024 às 03:23.

  1. Stack

    Stack Membro Participativo

    My program navigates through a 101x101 maze with myopic vision, storing what it knows to be blocked in a list. Hence the pathfinding method I am utilizing, A*, is repeated whenever an obstruction is encountered. I have done this in two ways: repeated forward A* and repeated backwards A*. However I notice that for repeated backward A*, a way more significant number of nodes are expanded. This is a bug I want to resolve. This program takes a frustratingly long amount of time to run and I don't know why and how to shave the time off significantly, especially since I want to further generate 50 mazes in total and iterate both search methods through all of them.

    import numpy as np
    import random

    blocked = set() #Stores the coordinates of all the found obstacles
    expanded = 0 #Stores the number of expanded nodes

    class Node:
    def __init__(self, index, prev, g, h):
    self.index = index #The pair that stores the indices of the area of the grid
    self.prev = prev #Another Node that stores the previous node
    self.g = g
    self.h = h

    def get_f(self):
    return (self.g + self.h)

    def set_prev(self, node):
    self.prev = node

    class BinHeap:
    def __init__(self):
    self.heap = []

    def isEmpty(self):
    if len(self.heap) == 0:
    return True
    return False

    def insert(self, node):
    self.heap.append(node)
    self.sortUp(len(self.heap) - 1)

    def getMin(self):
    return self.heap[0]

    def delMin(self):
    self.heap[0] = self.heap[-1]
    self.heap.pop()
    if not self.isEmpty():
    self.sortDown(0)

    def sortUp(self, index):
    parent = (index - 1) // 2
    while parent >=0 and (self.heap[parent].get_f() > self.heap[index].get_f() or (self.heap[parent].get_f() == self.heap[index].get_f() and self.heap[parent].g <= self.heap[index].g)): #Compares g values as a tiebreaker
    self.heap[index], self.heap[parent] = self.heap[parent], self.heap[index] #This will swap node reference
    index = parent
    parent = (index - 1) // 2

    def sortDown(self, index):
    child1, child2 = 2 * index + 1, 2 * index + 2
    currentIndex = index
    minimal = currentIndex
    isSmaller = True #Indicates if children with smaller f-values exist
    while((child1 < len(self.heap) or child2 < len(self.heap)) and isSmaller):
    isSmaller = False
    if(child1 < len(self.heap) and (self.heap[child1].get_f() < self.heap[minimal].get_f() or (self.heap[child1].get_f() == self.heap[minimal].get_f() and self.heap[child1].g >= self.heap[minimal].g))):
    minimal = child1
    if(child2 < len(self.heap) and (self.heap[child2].get_f() < self.heap[minimal].get_f() or (self.heap[child2].get_f() == self.heap[minimal].get_f() and self.heap[child2].g >= self.heap[minimal].g))):
    minimal = child2

    if minimal != currentIndex:
    self.heap[minimal], self.heap[currentIndex] = self.heap[currentIndex], self.heap[minimal]
    isSmaller = True
    currentIndex = minimal
    child1, child2 = 2 * currentIndex + 1, 2 * currentIndex + 2

    def generateWorld(): #Maze generation method: recursive backtracking.
    world = np.zeros((101, 101))
    directions = [(-2, 0), (2, 0), (0, -2), (0, 2)] #We jump by 2s to take into account the fact we first intialized everything as a wall(with values of 0) hence this creates a clear path

    #backtracking stack
    track = []

    #Check if in bounds
    def inBounds(x, y):
    return 0 < x < 100 and 0 < y < 100 #Notice we never access the areas on the border. Those will always remain as walls in this algorithm

    startX, startY = 1, 1
    world[startY][startX] = 1 #Start is marked as visited
    track.append((startX, startY))

    while len(track) != 0:
    x, y = track[-1]
    neighbors = []
    for dirx, diry in directions:
    adjx, adjy = x + dirx, y + diry
    if inBounds(adjx, adjy) and world[adjy][adjx] == 0:
    neighbors.append((adjx, adjy))

    if len(neighbors) != 0: #If we have unvisited neighbors, choose a random one to create a path to
    newX, newY = random.choice(neighbors)
    #Get the wall separating the current position and the neighbor and remove it
    wallX, wallY = (x + newX) // 2, (y + newY) // 2
    world[wallY][wallX] = 1
    world[newY][newX] = 1
    track.append((newX, newY)) #Add the neigbor onto the stack
    else: #If we hit a dead-end we backtrack
    track.pop()
    # New code to output the world to a text file
    with open('generated_world.txt', 'w') as f:
    for row in world:
    f.write(''.join(['#' if cell == 0 else '.' for cell in row]) + '\n')

    print("World generated and saved to 'generated_world.txt'", flush=True)
    return world

    def forwardA(start, goal, backwards): #This function does a single A* search. To be implemented within the main code for repeated A* search
    global expanded
    global blocked
    openlist = BinHeap()
    closedlist = set()
    startH = sum(abs(a - b) for a, b in zip(start, goal)) #Calculates manhattan distance. sum() returns the sum of all elements in an iterable, which here is our generator expression. Zip pairs the corresponding elements in each tuple as their own tuples
    startNode = Node(start, None, 0, startH)
    openlist.insert(startNode) #Adds start node to the open list

    directions = [(1, 0), (-1, 0), (0, 1), (0, -1)]
    #check if in bounds
    def isBounded(x, y):
    return 0 <= x < 101 and 0 <= y < 101
    #check if closedlist contains element
    def isContains(element):
    return element in closedlist

    #Will go through the map until we reach the goal
    goalNode = None
    #Finds a path from start to goal using A* search
    while (not openlist.isEmpty()) and (not isContains(goal)):
    #Acquires and pops the node with the minimum estimated distance to goal
    currentNode = openlist.getMin()
    currentIndex = currentNode.index
    closedlist.add(currentIndex)
    openlist.delMin()
    #If the current node is our goal, we store it in goalNode
    if currentIndex == goal:
    goalNode = currentNode
    #Look at its neighbors and add the presumably unblocked ones to the open list
    for dirx, diry in directions:
    x, y = currentIndex
    adjx = x + dirx
    adjy = y + diry
    if isBounded(adjx, adjy) and ((adjx, adjy) not in blocked) and (not isContains((adjx, adjy))):
    g = currentNode.g + 1
    h = sum(abs(a - b) for a, b in zip((adjx, adjy), goal))
    adjNode = Node((adjx, adjy), currentNode, g, h)
    openlist.insert(adjNode)

    #Derive path from goalNode. PS: This function can potentially also be modified to work with backwards A* by the choice on whether to invert the derived path or not
    path = [] #Stores a path from goal to start
    if goalNode:
    path.append(goalNode.index)
    node = goalNode
    while node.prev is not None:
    node = node.prev
    path.append(node.index)

    expanded += len(closedlist)
    corrected_path = path[::-1] #Reverses the path to get the order from start to goal
    #print(corrected_path)
    if backwards:
    return path
    return corrected_path












    if __name__ == "__main__":
    #Our main code goes in here
    world = generateWorld()
    #NOTES: We can make the default start position (1, 1) and the goal position a random unblocked area in the map
    #Make sure that when checking the corresponding position on the map, invert the points. Ex: (2, 3) corresponds to world[3][2]
    #As such, if you find an obstacle at world[y][x] add its coordinates to blocked as (x, y)

    #Forward Repeated A* Search
    blocks_travelled = -1 #starts at -1 to take into account the fact the start node is also included in the path travelled
    reps = 0
    breaks = 0
    start = (1, 1)
    #Choose goal randomly from free cells
    freeCells = np.argwhere(world == 1)
    freeCells = freeCells[~np.all(freeCells == (start[1], start[0]), axis=1)] #Excludes start from being a possible goal
    goalY, goalX = freeCells[np.random.choice(freeCells.shape[0])]
    goal = (goalX, goalY)
    #Directions and checking if in bounds
    directions = [(1, 0), (-1, 0), (0, 1), (0, -1)]
    def isBounded(x, y):
    return 0 <= x < 101 and 0 <= y < 101
    #Now the actual search
    isFound = False
    location = start #Current location in the map
    while not isFound:
    path = forwardA(location, goal, backwards=False) #A* search for the presumed shortest path
    if len(path) == 0:
    print("Clear path to goal does not exist")
    break
    reps += 1
    for x, y in path:
    if (x, y) in blocked:
    blocks_travelled -= 1 #Accounts for the fact the current location will be included in the next path from A* and hence double counted
    breaks += 1
    break
    else:
    blocks_travelled += 1
    location = (x, y)
    for dirx, diry in directions: #Checks cells from our location's field of view
    adjx = x + dirx
    adjy = y + diry
    if isBounded(adjx, adjy) and world[adjy][adjx] == 0 and ((adjx, adjy) not in blocked):
    blocked.add((adjx, adjy))

    if location == goal:
    isFound = True
    #print(blocked)
    if isFound:
    print(f"Forward A*: Number of Blocks Travelled: {blocks_travelled}", flush=True)
    print(f"Forward A*: Number of Times A* Repeated: {reps}", flush=True) #This is just a debug statement for personal use
    #print(f"Number of Breaks: {breaks}")
    print(f"Forward A*: Number of Expanded Nodes: {expanded}", flush=True)

    #Backwards Repeated A*
    blocked.clear()
    blocks_travelled = -1
    reps = 0
    breaks = 0
    expanded = 0
    #Now the actual search
    isFound = False
    location = start #Current location in the map
    while not isFound:
    path = forwardA(goal, location, backwards=True) #A* search for the presumed shortest path
    if len(path) == 0:
    print("Clear path does not exist")
    break
    reps += 1
    for x, y in path:
    if (x, y) in blocked:
    blocks_travelled -= 1
    breaks += 1
    break
    else:
    blocks_travelled += 1
    location = (x, y)
    for dirx, diry in directions: #Checks cells from our location's field of view
    adjx = x + dirx
    adjy = y + diry
    if isBounded(adjx, adjy) and world[adjy][adjx] == 0 and ((adjx, adjy) not in blocked):
    blocked.add((adjx, adjy))

    if location == goal:
    isFound = True
    #print(blocked)
    if isFound:
    print(f"Backward A*: Number of Blocks Travelled: {blocks_travelled}", flush=True)
    print(f"Backward A*: Number of Times A* Repeated: {reps}", flush=True) #This is just a debug statement for personal use
    #print(f"Number of Breaks: {breaks}")
    print(f"Backward A*: Number of Nodes Expanded: {expanded}", flush=True)


    I suspect this issue with repeated backwards A* is a large contributor to why my search program runs slow. But I cannot figure out the cause of this bug. Help would be appreciated

    Continue reading...

Compartilhe esta Página