Close

2048 - Implementierung meiner ersten AI

Written on October 17, 2019 (last modified: October 17, 2019)

Dieser Blogeintrag entstand in Form des ersten Praktikums des Faches Artificial Intelligence an der Zürcher Hochschule für Angewandte Wissenschaften

2048 - Kann unsere AI für uns gewinnen?

2048 ist definitiv ein Spiel, das süchtig machen kann. Das Spielfeld besteht aus 16 Feldern mit den Zahlen aus der Folge:

Das Ziel ist, dass identische Zahlen zu einer neuen Zahl zusammengeschoben werden, wobei die neue Zahl der Summe der zusammengeschobenen Zahlen besteht. Das Spiel kann kostenlos unter https://play2048.co gespielt werden.

Im Rahmen des Faches Artificial Intelligence an der Zürcher Hochschule wurde uns ein Programm zu Verfügung gestellt, mit welchem das Spiel im Browser angesteuert werden kann. Die Challenge bestand darin, eine Heuristic AI sowie eine Search AI zu programmieren, welche ein gutes Ergebnis in diesem Spiel erzielen sollten.

Wenn wir das Spiel manuell gespielt haben, konnten wir meist nur etwa 16’000 Punkte erreichen und gewannen das Spiel selten. Dies konnten wir nicht auf uns sitzen lassen und die Challenge war schnell definiert - Die AI soll das Spiel für uns gewinnen. Und zwar immer.

Heuristic AI

Das Ziel der Heuristic AI ist, anhand des aktuellen Spielzustands den nächstbesten Zug vorauszusagen. Es ist explizit definiert, dass nicht mehrere Züge vorausberechnet sondern nur der aktuelle Spielzustand bewertet werden darf. Zu Beginn wurden nortiert, was alles hilfreich sein kann beim Spiel. Dabei wurden folgende Punkte evaluiert

  • Grösste Zahlen in den Ecken
  • Grosse Zahlen am Rand
  • Gleiche Zahlen nebeneinander
  • Zahlen der Grösse nach geordnet verhindert dass das Zusammenschieben blockiert wird
  • Grosse Zahlen zusammenschieben ist wichtiger als kleine zusammenzuschieben

  • Anzahl freie Felder
    • Mit vielen freien Felder ist es weniger Wahrscheinlich, dass das Spiel in den nächsten Zügen beendet wird. Dadurch wird jedoch auch der Zufälligkeitsbereich erhöht auf dem der neue Block gesetzt werden kann.
    • Desshalb sollte bei sehr vielen freien Felder das Mergen von Blöcken verzögert werden.

Zu Beginn wurden die obigen Punkte einzeln implementiert, jedoch brachte diese nicht den gewünschten Erfolg. Schliesslich haben wir uns für eine Kombination sämtlicher Punkte entschieden - und eine Tabelle aufgestellt, was wie viele Punkte bringen soll. Dies führte jedoch letzlich zu mehr Chaos als zum Ziel und es musste eine neue Lösung her. Statt auf Tabellen haben wir fortan auf Matrizen gesetzt. So wurden in der Matrize Gewichte definiert, welche mit dem aktuellen Spielstand anhand folgender Formel berechnet werden:

Die Matrize wurde mit folgenden Gewichtungen in 10 Spielen getestet:

​ Links Links oben Schlange Ecken

67145642-53ad1380-f283-11e9-8e2d-f2ad1f0fd290.png (902×527)

In einem nächsten Schritt wurde für jede Matrize verschiedene Transformationsmöglichkeiten berücksichtigt.

Beispiel Schlange:

def get_snake_score(board):
    snake = [[15, 14, 13, 12], [8, 9, 10, 11], [7, 6, 5, 4], [0, 1, 2, 3]]
    snake_score = [
        numpy.array(snake),
        numpy.flip(snake),
        numpy.transpose(snake),
        numpy.transpose(numpy.flip(snake)),
        15 - numpy.array(snake),
        15 - numpy.flip(snake),
        15 - numpy.transpose(snake),
        15 - numpy.transpose(numpy.flip(snake))
    ]

67145642-53ad1380-f283-11e9-8e2d-f2ad1f0fd290.png (902×527)

Aus den beiden Diagrammen ist ersichtlich dass die schlangenförmige Gewichtung mit den Transformationsmöglichkeiten als beste hervorgeht.

Die verschiedenen Herangehensweisen wurden nun in einer Methode zusammengefasst und um weitere Ideen ergänzt. Gemäss den obigen Diagrammen und weiteren Evaluierungen wurde jeder Methode ein Faktor zugewiesen, um die bewährten Methoden stärker zu berücksichtigen.

def get_best_move_from_scores(board):
    all_scores = [
        [get_snake_score, 1],
        [get_left_score, 0.2],
        [get_left_corner_score, 0.2],
        [get_corner_score, 0.1],
        [get_penalty_for_difference_between_neighbours, 0.01],
    ]

    if nr_of_empty_tiles > 9:
        all_scores.append([get_score_with_least_merged_tiles, 0.2])
    elif nr_of_empty_tiles < 6:
        all_scores.append([get_score_with_most_merged_tiles, 0.2])
    else:
        all_scores.append([get_score_with_highest_merged_tiles, 0.2])

    moves = {DOWN: 0, RIGHT: 0, LEFT: 0, UP: 0}
    for move in [UP, LEFT, RIGHT, DOWN]:
        new_board = execute_move(move, board)
        points = sum([score[0](new_board) * score[1] for score in all_scores])
        moves[move] += points
    return get_best_moves_sorted(moves)

Durch die Einschränkung, dass die Heuristic AI lediglich das aktuelle Brett und den nächsten Zug bewertet ist es mit unserer Lösung nicht möglich das Spiel zu gewinnen. Im Vergleich zur Variante mit zufällig gewähltem nächsten Zug hat sich unser Ansatz jedoch im Durchschnitt um 260% verbessert. Hebt man die Einschränkung auf und berücksichtigt einen weiteren Schritt ist es möglich das Spiel zu gewinnen.

67145642-53ad1380-f283-11e9-8e2d-f2ad1f0fd290.png (902×527)

Search AI

Im Gegensatz zur Heuristic AI darf die Search AI die nächsten Züge vorausberechnen. Dazu kommt eine leicht abgeänderte Form des Expectimax-Algorithmus zum Einsatz. Dabei wird eine Baumstruktur mit allen möglichen Spiel-Situationen aufgebaut. Der oberste Node des Baums ist das aktuelle Spielfeld. Danach ist der Spieler am Zug und hat vier mögliche Aktionen zur Auswahl: UP, DOWN, LEFT und RIGHT. Danach kommt der Zufallsgenerator des Spiels zum Zugs, welcher mit einer Wahrscheinlichkeit von 90% eine 2 an einer freien Stelle platziert und zu 10% eine 4 an einer freien Stellen. Im Worst-Case, dies ist wenn nur 1 Feld besetzt ist, führt dies also zu neuen Kombinationen. Angenommen im Schnitt sind Felder belegt, dann ergeben sich bei einer Tiefe die folgende Anzahl an Ästen (wobei einem Durchgang für den Spieler und einen Durchgang für den Zufallsgenerator des Spiels entspricht):

Da dies schnell zu einer kombinatorischen Explosion führt, müssen geeignete Massnahmen getroffen werden, damit zum einen der Code genügend schnell läuft und zum anderen eine ausreichend tiefe Baumsuchtiefe erreicht werden kann.

  • Code optimieren betreffend Effizienz
  • “Schlechte” Äste abschneiden
  • Äste mit Zügen welche nichts bewirken abschneiden
  • gute Heuristic zum abschätzen, wie gut der Ast ist

Code optimieren

Der ursprüngliche Code bestand aus einer Klasse Node, welcher einen Knoten im Baum repräsentierte. Nachfolgender Pseudocode zeigt die Struktur dieser Klasse:

class Node:
	def __init__(self, parent, board):
		self.parent = parent
		self.board = board

	def get_points(self):
		return Heuristic.evaluate_board(board)		

Der oberste Node im Baum war dabei das aktuelle Board, die unteren Nodes wurden für jeden möglichen Move sowie jede mögliche Zufallszahl erzeugt. Es musste aber festgestellt werden, dass der Code bereits ab einer Tiefe von langsam läuft.

Die Idee mit dem Stack

Während einer Pause kam uns eine verrückte Idee - Wir könnten die Klasse Node komplett weglassen und stattdessen mit rekursiven Aufrufen direkt den Stack nutzen - Ob dies schneller oder langsamer ist? Wir hatten keine Ahnung. Wir implementierten also einen rekursiven Aufruf und merkten uns, wer an der Reihe ist

  • Agent GAME_AGENT: Das Spiel ist am Zug, dieses platziert an jeder freien Stelle eine 2 oder eine 4
  • Agent GAMER_AGENT: Der Spieler ist am Zug und kann zwischen den 4 Aktionen UP, DOWN, LEFT und RIGHT wählen.

Pseudocode:

def calc_best_move(board, depth, agent):
    if depth == 0:
        return get_score(board)

    if agent == GAME_AGENT:
        points = 0
        for empty_tilde in empty_tildes:
            empty_fields += 1
            new_board = board.copy()
            new_board.where(empty_tilde) = 2
            points += 0.9 * calc_best_move(new_board, depth - 1, GAMER_AGENT)
            new_board = board.copy()
            new_board.where(empty_tilde) = 4
            points += 0.1 * calc_best_move(new_board, depth - 1, GAMER_AGENT)
        return points / len(empty_tildes)

    elif agent == GAMER_AGENT:
        points = 0
        for move in [LEFT, RIGHT, UP, DOWN]:
            new_board = board.copy()
            new_board = execute_move(move, new_board)
            if not board_equals(board, new_board):
                points = max(points, calc_best_move(new_board, depth - 1, GAME_AGENT))
        return points

Der GAMER_AGENT ruft also die GAME_AGENT auf und umgekehrt und jedes mal wird die Tiefe um 1 dekrementiert. Sobald die Tiefe 0 erreicht wurde, werden die Punkte berechnet und zurückgegeben.

Diese Strategie mit dem Wechsel auf den Stack hat sich durchaus bezahlt gemacht. So konnte die Geschwindigkeit um markant gesteigert werden und die Suchtiefe von war fortan problemlos möglich.

Optimierung der Bewertungsfunktion

Die Bewertung wie gut ein eine Konstellation auf dem Board ist erfolgt anhand einer Matrix-Gewichtung, welche aus zwei Teilen besteht. Zum einen gibt es Plus-Punkte wenn die Steine gut angeordnet sind, zum anderen einen Abzug falls die Steine nebeneinader ein grosses Delta aufweisen.

Definition einer guten Anordnung:

Der Penalty hingegen führt zu einem Abzug, wenn verschieden grosse Felder nebeneinander angeordnet sind:

def get_penalty_unequal_neighbours(board):
    penalty = 0
    for i in range(0, 3):
        for j in range(0, 3):
            penalty += abs(board[i][j] - board[i][j + 1]) * 2
            penalty += abs(board[i][j] - board[i + 1][j]) * 2
    return penalty

Der cProfiler zeigte jedoch auf, dass genau in dieser Bewertung unsere grösste Ineffizienz lag. Nach 60 Sekunden zeigte die Statistik folgende Werte an:

Name Call Count Time (ms) Own Time(ms)
get_penalty_unequal_neighbours 1118718 28555 27118 (44.2%)
calc_best_move 1258403 58984 7723 (12.6%)
numpy.array 1371177 4567 4567 (7.4%)
numpy.ufunc 1244866 3291 3291 (5.4%)
get_points_value_distribution 1118718 14425 2813 (4.6%)

Die betroffene Methode wurde wie folgt optimiert:

def get_penalty_unequal_neighbours(board):
    penalty = numpy.sum(numpy.absolute(numpy.diff(board)))
    penalty += numpy.sum(numpy.absolute(numpy.diff(numpy.swapaxes(board, 0, 1))))
    return penalty

Danach wurde ein weiteres Profile aufgenommen und es wurde ersichtlich, dass der Code nun einiges effizienter ist:

Name Call Count Time (ms) Own Time(ms)
diff 1922146 13604 11533 (18.9%)
calc_best_move 1040854 60383 7182 (11.7%)
numpy.ufunc 2952499 7060 7060 (7.6%)
numpy.array 3021824 4639 4639 (6.2%)
get_penalty_unequal_neighbours 961073 34999 3822 (6.2%)

Variable Suchtiefe

depth = 7 - heuristicai.get_number_of_free_fields(new_board)
    if number_of_moves < 300:
        if depth < 2: depth = 2
    elif number_of_moves < 700:
        if depth < 3:depth = 3
    elif number_of_moves < 4500:
        if depth < 4:depth = 4
    else:
        depth += 1
        if depth < 5: depth = 5

Reinforcement Learning

Natürlich hätten wir die Heuristic noch weiter optimieren und dadurch sicherlich auch bessere Resultate erzielen können. Da es uns in diesem Praktikum jedoch wichtiger war neues zu erlernen entschieden wir uns dazu, die Heurstic AI und die Search AI beiseite zu legen und eine neue Herausforderung zu suchen. Nach einer kurzen Recherche im Internet stiessen wir auf Reinforcement Learning - und hatten keine Ahnung was das genau ist. Wikipedia definiert RL wie folgt:

Bestärkendes Lernen oder verstärkendes Lernen (englisch reinforcement learning) steht für eine Reihe von Methoden des maschinellen Lernens, bei denen ein Agent selbständig eine Strategie erlernt, um erhaltene Belohnungen zu maximieren. Dabei wird dem Agenten nicht vorgezeigt, welche Aktion in welcher Situation die beste ist, sondern er erhält zu bestimmten Zeitpunkten eine Belohnung, die auch negativ sein kann. Anhand dieser Belohnungen approximiert er eine Nutzenfunktion, die beschreibt, welchen Wert ein bestimmter Zustand oder Aktion hat.

Bei Reinforcement Learning handelt es sich um ein riesengrossen, sehr komplexes Gebiet und wir konnten uns in dieser kurzen Zeit zu wenig Einarbeiten, um hier eine umfangreiche und verlässliche Beschreibung anzubieten.

Wir setzten auf auf die Library Keras-RL, welche die Open Source Deep-Learning-Bibliothek Keras um Reinforcement Learning erweitert. Zu Keras-RL gibt es eine gute Dokumentation, natürlich findet man auch Beispiele und Tutorials. Wir mussten einiges an Zeit investieren, um den ersten Prototypen unseres 2048-RL-Algorithmus zu erstellen - nur um dann festzustellen, dass er zwar irgendwas macht, jedoch ganz bestimmt nicht das, was wir uns erhofft haben. Es war an der Zeit, dass wir uns von anderen Leuten mit mehr Wissen inspirieren lassen - und offen gesagt ganz schön viel von deren Code kopiert haben ohne jedes Detail zu verstehen. Wir wollen jedoch auch betonen, dass wir uns erst seit ein paar Wochen mit AI auseinandersetzen und dadurch eher einen Überblick erhalten wollen, wie so etwas gemacht werden kann. Es wäre natürlich wünschenswert, dass wir am Ende des Kurses unseren Code besser verstehen - und die nachfolgende Beschreibung korrigieren können.

Keras-RL

Zu Beginn wird versucht ein bestehendes Model aus dem Speicher zu laden. Falls keines vorhanden ist, dann wird ein neues erstellt:

NUM_MATRIX = 16 # Anzahl Matrizen für Encoding des Spielfelds
STATES = 1  # 1 State, da fully observable
INPUT_MATRIX_SIZE = (4, 4)  # Input in Neurales Netzwerk -> 4x4 Matrix

INPUT_SHAPE = (STATES, 4 + 4 * 4, NUM_MATRIX,) + INPUT_MATRIX_SIZE
processor = InputProcessor(num_one_hot_matrices=NUM_MATRIX)

NUM_DENSE_NEURONS_L1 = 1024  # Anzahl Neuronen im 1. Layer
NUM_DENSE_NEURONS_L2 = 512  # Anzahl Neuronen im 2. Layer
NUM_DENSE_NEURONS_L3 = 256  # Anzahl Neuronen im 3. Layer
ACTIVATION_FTN = 'relu'
ACTIVATION_FTN_OUTPUT = 'linear'

# Erstelle das Modell
model = Sequential()
model.add(Flatten(input_shape=INPUT_SHAPE))
model.add(Dense(units=NUM_DENSE_NEURONS_L1, activation=ACTIVATION_FTN))
model.add(Dense(units=NUM_DENSE_NEURONS_L2, activation=ACTIVATION_FTN))
model.add(Dense(units=NUM_DENSE_NEURONS_L3, activation=ACTIVATION_FTN))
model.add(Dense(units=NUM_ACTIONS_PRO_AGENT, activation=ACTIVATION_FTN_OUTPUT))

Der InputProcessor ist eine eigene Klasse, welche die Aktionen auf dem Spielfeld verarbeitet, später mehr dazu. Danach wird versucht ein bestehendes Training aus dem Speicher zu laden, ansonsten wir ein neuer Speicher erstellt:

memory = SequentialMemory(limit=MEMORY_SIZE, window_length=STATES)

Danach wird der Agent erstellt. Die meisten Parameter haben wir dabei kopiert und können diese nicht im Detail erklären - falls wir nach dem Kurs dazu in der Lage sind, holen wir dies gerne nach.

TRAIN_POLICY = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr='eps', value_max=0.05,
 				   value_min=0.05, value_test=0.01, nb_steps=NUM_STEPS_ANNEALED)
TEST_POLICY = EpsGreedyQPolicy(eps=.01)
dqn = DQNAgent(model=model, nb_actions=NUM_ACTIONS_PRO_AGENT, test_policy=TEST_POLICY,
      policy=TRAIN_POLICY, memory=memory, processor=processor,   
      nb_steps_warmup=NUM_STEPS_WARMUP, gamma=.99,
      target_model_update=TARGET_MODEL_UPDATE,
      train_interval=4, delta_clip=1.)

dqn.compile(Adam(lr=.00025), metrics=['mse'])

Danach werden verschiedene Callbacks gesetzt (diese werden nach jeder Episode aufgerufen), welche jedoch nur für Logs sowie dem Erstellen von Grafiken benötigt werden und wir hier nicht genauer darauf eingehen. Anschliessend wir das Training gestartet,

ENVIRONMENT_NAME = '2048'
environment = Game2048Env()

# Seed, damit reproduzierbar
random_seed = 123
random.seed(random_seed)
np.random.seed(random_seed)
environment.seed(random_seed)

dqn.fit(environment, callbacks=_callbacks, nb_steps=NUM_STEPS, visualize=False, verbose=0) # Starte Training

# Speichere die Gewichte nach dem Training
dqn.save_weights(weights_filepath, overwrite=True)
memory = (memory, memory.actions, memory.rewards, memory.terminals, memory.observations)

Am Ende werden sämtliche Daten gespeichert, damit das Training später wieder fortgesetzt werden kann.

Processors

Der weiter oben verwendete InputProcessor erweitert die Processor des rl.core und ist eine Art Pre-Prozessor für den Input in das neurale Netzwerk. Der InputProcessor berechnet alle möglichen Kombinationen für die nächsten zwei Schritte, welche durch den Spieler (Schieben in die Richtungen UP, DOWN, LEFT, RIGHT) und den Zufallsgenerator des Spiels (neue Zahl) vorgenommen werden können. Dies wird durch eine One-Hot-Encoding Methode gemacht, wobei für jede mögliche Zahl eine Matrix erstellt wird und eine 0 anzeigt, dass an entsprechender Stelle die Zahl nicht vorkommt und eine 1, dass die Zahl an entsprechender Stelle vorhanden ist. Es gibt also die folgenden Matrizen:

Bei einem Spielfeld mit

hätten die Matrizen folgende Werte:

Diese Umwandlung wird durch folgenden Codeblock ausgeführt:

# Dict mit 2^n:n für alle möglichen Zahlen -> {2: 1, 4: 2, 8: 3, ..., 65536: 16}
self.table = {2 ** i: i for i in range(1, self.num_one_hot_matrices)}
self.table[0] = 0

def one_hot_encoding(self, grid):
    grid_one_hot = np.zeros(shape=(self.num_one_hot_matrices, 4, 4))
    for i in range(4):
        for j in range(4):
            element = grid[i, j]
            grid_one_hot[self.table[element], i, j] = 1
    return grid_one_hot

Des weiteren müssen diverse Funktionen von der KlasseProcessor der rl.core Library überschrieben werden. So beispielsweise die Funktionprocess_observation(self, observation), welche von der Keras-RL Library aufgerufen wird, um für jeden Obersvation-Zustand alle möglichen Zustände nach den nächsten beiden Schritten zu berechnen. Der Rückgabewert erfolgt dabei in der One-Hot-Encoding.

    def process_observation(self, observation):
        grids_after_player = self.get_grids_after_move_of_player(observation)
        grids_after_chance = [] # nach Zufallsgenerator des Spiels
        for grid in grids_after_player:
            grids_after_chance.append(grid)
            grids_temp = self.get_grids_after_move_of_player(grid)
            for grid_temp in grids_temp:
                grids_after_chance.append(grid_temp)
        grids_list = np.array([self.one_hot_encoding(grid) for grid in
                              grids_after_chance])
        return grids_list

Resultat

Wir haben etwas mehr als 2500 Episoden laufen lassen, danach kamen wir an Speicherprobleme in der IDE, obwohl wir fast 17GB zugewiesen hatten. Da es doch sehr lange dauerte haben wir keine weiteren Läufe mehr vorgenommen und zeigen nachfolgenden gerne das Resultat:

Erhaltene Punkte pro Episode:

PascalPlot

myplot

Höchster Block pro Episode:

myplot2

myplot4

Autoren: Luca Stähli und Pascal Sager