Dieser Blogeintrag entstand in Form des ersten Praktikums des Faches Artificial Intelligence an der Zürcher Hochschule für Angewandte Wissenschaften
2048 ist definitiv ein Spiel, das süchtig machen kann. Das Spielfeld besteht aus 16 Feldern mit den Zahlen aus der Folge:
Das Ziel ist, dass identische Zahlen zu einer neuen Zahl zusammengeschoben werden, wobei die neue Zahl der Summe der zusammengeschobenen Zahlen besteht. Das Spiel kann kostenlos unter https://play2048.co gespielt werden.
Im Rahmen des Faches Artificial Intelligence an der Zürcher Hochschule wurde uns ein Programm zu Verfügung gestellt, mit welchem das Spiel im Browser angesteuert werden kann. Die Challenge bestand darin, eine Heuristic AI sowie eine Search AI zu programmieren, welche ein gutes Ergebnis in diesem Spiel erzielen sollten.
Wenn wir das Spiel manuell gespielt haben, konnten wir meist nur etwa 16’000 Punkte erreichen und gewannen das Spiel selten. Dies konnten wir nicht auf uns sitzen lassen und die Challenge war schnell definiert - Die AI soll das Spiel für uns gewinnen. Und zwar immer.
Das Ziel der Heuristic AI ist, anhand des aktuellen Spielzustands den nächstbesten Zug vorauszusagen. Es ist explizit definiert, dass nicht mehrere Züge vorausberechnet sondern nur der aktuelle Spielzustand bewertet werden darf. Zu Beginn wurden nortiert, was alles hilfreich sein kann beim Spiel. Dabei wurden folgende Punkte evaluiert
Grosse Zahlen zusammenschieben ist wichtiger als kleine zusammenzuschieben
Zu Beginn wurden die obigen Punkte einzeln implementiert, jedoch brachte diese nicht den gewünschten Erfolg. Schliesslich haben wir uns für eine Kombination sämtlicher Punkte entschieden - und eine Tabelle aufgestellt, was wie viele Punkte bringen soll. Dies führte jedoch letzlich zu mehr Chaos als zum Ziel und es musste eine neue Lösung her. Statt auf Tabellen haben wir fortan auf Matrizen gesetzt. So wurden in der Matrize Gewichte definiert, welche mit dem aktuellen Spielstand anhand folgender Formel berechnet werden:
Die Matrize wurde mit folgenden Gewichtungen in 10 Spielen getestet:
Links Links oben Schlange Ecken
In einem nächsten Schritt wurde für jede Matrize verschiedene Transformationsmöglichkeiten berücksichtigt.
Beispiel Schlange:
def get_snake_score(board):
snake = [[15, 14, 13, 12], [8, 9, 10, 11], [7, 6, 5, 4], [0, 1, 2, 3]]
snake_score = [
numpy.array(snake),
numpy.flip(snake),
numpy.transpose(snake),
numpy.transpose(numpy.flip(snake)),
15 - numpy.array(snake),
15 - numpy.flip(snake),
15 - numpy.transpose(snake),
15 - numpy.transpose(numpy.flip(snake))
]
Aus den beiden Diagrammen ist ersichtlich dass die schlangenförmige Gewichtung mit den Transformationsmöglichkeiten als beste hervorgeht.
Die verschiedenen Herangehensweisen wurden nun in einer Methode zusammengefasst und um weitere Ideen ergänzt. Gemäss den obigen Diagrammen und weiteren Evaluierungen wurde jeder Methode ein Faktor zugewiesen, um die bewährten Methoden stärker zu berücksichtigen.
def get_best_move_from_scores(board):
all_scores = [
[get_snake_score, 1],
[get_left_score, 0.2],
[get_left_corner_score, 0.2],
[get_corner_score, 0.1],
[get_penalty_for_difference_between_neighbours, 0.01],
]
if nr_of_empty_tiles > 9:
all_scores.append([get_score_with_least_merged_tiles, 0.2])
elif nr_of_empty_tiles < 6:
all_scores.append([get_score_with_most_merged_tiles, 0.2])
else:
all_scores.append([get_score_with_highest_merged_tiles, 0.2])
moves = {DOWN: 0, RIGHT: 0, LEFT: 0, UP: 0}
for move in [UP, LEFT, RIGHT, DOWN]:
new_board = execute_move(move, board)
points = sum([score[0](new_board) * score[1] for score in all_scores])
moves[move] += points
return get_best_moves_sorted(moves)
Durch die Einschränkung, dass die Heuristic AI lediglich das aktuelle Brett und den nächsten Zug bewertet ist es mit unserer Lösung nicht möglich das Spiel zu gewinnen. Im Vergleich zur Variante mit zufällig gewähltem nächsten Zug hat sich unser Ansatz jedoch im Durchschnitt um 260% verbessert. Hebt man die Einschränkung auf und berücksichtigt einen weiteren Schritt ist es möglich das Spiel zu gewinnen.
Im Gegensatz zur Heuristic AI darf die Search AI die nächsten Züge vorausberechnen. Dazu kommt eine leicht abgeänderte Form des Expectimax-Algorithmus zum Einsatz. Dabei wird eine Baumstruktur mit allen möglichen Spiel-Situationen aufgebaut. Der oberste Node des Baums ist das aktuelle Spielfeld. Danach ist der Spieler am Zug und hat vier mögliche Aktionen zur Auswahl: UP
, DOWN
, LEFT
und RIGHT
. Danach kommt der Zufallsgenerator des Spiels zum Zugs, welcher mit einer Wahrscheinlichkeit von 90% eine 2 an einer freien Stelle platziert und zu 10% eine 4 an einer freien Stellen. Im Worst-Case, dies ist wenn nur 1 Feld besetzt ist, führt dies also zu neuen Kombinationen. Angenommen im Schnitt sind Felder belegt, dann ergeben sich bei einer Tiefe die folgende Anzahl an Ästen (wobei einem Durchgang für den Spieler und einen Durchgang für den Zufallsgenerator des Spiels entspricht):
Da dies schnell zu einer kombinatorischen Explosion führt, müssen geeignete Massnahmen getroffen werden, damit zum einen der Code genügend schnell läuft und zum anderen eine ausreichend tiefe Baumsuchtiefe erreicht werden kann.
Der ursprüngliche Code bestand aus einer Klasse Node, welcher einen Knoten im Baum repräsentierte. Nachfolgender Pseudocode zeigt die Struktur dieser Klasse:
class Node:
def __init__(self, parent, board):
self.parent = parent
self.board = board
def get_points(self):
return Heuristic.evaluate_board(board)
Der oberste Node im Baum war dabei das aktuelle Board, die unteren Nodes wurden für jeden möglichen Move sowie jede mögliche Zufallszahl erzeugt. Es musste aber festgestellt werden, dass der Code bereits ab einer Tiefe von langsam läuft.
Während einer Pause kam uns eine verrückte Idee - Wir könnten die Klasse Node
komplett weglassen und stattdessen mit rekursiven Aufrufen direkt den Stack nutzen - Ob dies schneller oder langsamer ist? Wir hatten keine Ahnung. Wir implementierten also einen rekursiven Aufruf und merkten uns, wer an der Reihe ist
GAME_AGENT
: Das Spiel ist am Zug, dieses platziert an jeder freien Stelle eine 2 oder eine 4GAMER_AGENT
: Der Spieler ist am Zug und kann zwischen den 4 Aktionen UP
, DOWN
, LEFT
und RIGHT
wählen.Pseudocode:
def calc_best_move(board, depth, agent):
if depth == 0:
return get_score(board)
if agent == GAME_AGENT:
points = 0
for empty_tilde in empty_tildes:
empty_fields += 1
new_board = board.copy()
new_board.where(empty_tilde) = 2
points += 0.9 * calc_best_move(new_board, depth - 1, GAMER_AGENT)
new_board = board.copy()
new_board.where(empty_tilde) = 4
points += 0.1 * calc_best_move(new_board, depth - 1, GAMER_AGENT)
return points / len(empty_tildes)
elif agent == GAMER_AGENT:
points = 0
for move in [LEFT, RIGHT, UP, DOWN]:
new_board = board.copy()
new_board = execute_move(move, new_board)
if not board_equals(board, new_board):
points = max(points, calc_best_move(new_board, depth - 1, GAME_AGENT))
return points
Der GAMER_AGENT
ruft also die GAME_AGENT
auf und umgekehrt und jedes mal wird die Tiefe um 1 dekrementiert. Sobald die Tiefe 0 erreicht wurde, werden die Punkte berechnet und zurückgegeben.
Diese Strategie mit dem Wechsel auf den Stack hat sich durchaus bezahlt gemacht. So konnte die Geschwindigkeit um markant gesteigert werden und die Suchtiefe von war fortan problemlos möglich.
Die Bewertung wie gut ein eine Konstellation auf dem Board ist erfolgt anhand einer Matrix-Gewichtung, welche aus zwei Teilen besteht. Zum einen gibt es Plus-Punkte wenn die Steine gut angeordnet sind, zum anderen einen Abzug falls die Steine nebeneinader ein grosses Delta aufweisen.
Definition einer guten Anordnung:
Der Penalty hingegen führt zu einem Abzug, wenn verschieden grosse Felder nebeneinander angeordnet sind:
def get_penalty_unequal_neighbours(board):
penalty = 0
for i in range(0, 3):
for j in range(0, 3):
penalty += abs(board[i][j] - board[i][j + 1]) * 2
penalty += abs(board[i][j] - board[i + 1][j]) * 2
return penalty
Der cProfiler zeigte jedoch auf, dass genau in dieser Bewertung unsere grösste Ineffizienz lag. Nach 60 Sekunden zeigte die Statistik folgende Werte an:
Name | Call Count | Time (ms) | Own Time(ms) |
---|---|---|---|
get_penalty_unequal_neighbours | 1118718 | 28555 | 27118 (44.2%) |
calc_best_move | 1258403 | 58984 | 7723 (12.6%) |
numpy.array | 1371177 | 4567 | 4567 (7.4%) |
numpy.ufunc | 1244866 | 3291 | 3291 (5.4%) |
get_points_value_distribution | 1118718 | 14425 | 2813 (4.6%) |
… | … | … | … |
Die betroffene Methode wurde wie folgt optimiert:
def get_penalty_unequal_neighbours(board):
penalty = numpy.sum(numpy.absolute(numpy.diff(board)))
penalty += numpy.sum(numpy.absolute(numpy.diff(numpy.swapaxes(board, 0, 1))))
return penalty
Danach wurde ein weiteres Profile aufgenommen und es wurde ersichtlich, dass der Code nun einiges effizienter ist:
Name | Call Count | Time (ms) | Own Time(ms) |
---|---|---|---|
diff | 1922146 | 13604 | 11533 (18.9%) |
calc_best_move | 1040854 | 60383 | 7182 (11.7%) |
numpy.ufunc | 2952499 | 7060 | 7060 (7.6%) |
numpy.array | 3021824 | 4639 | 4639 (6.2%) |
get_penalty_unequal_neighbours | 961073 | 34999 | 3822 (6.2%) |
… | … | … | … |
depth = 7 - heuristicai.get_number_of_free_fields(new_board)
if number_of_moves < 300:
if depth < 2: depth = 2
elif number_of_moves < 700:
if depth < 3:depth = 3
elif number_of_moves < 4500:
if depth < 4:depth = 4
else:
depth += 1
if depth < 5: depth = 5
Natürlich hätten wir die Heuristic noch weiter optimieren und dadurch sicherlich auch bessere Resultate erzielen können. Da es uns in diesem Praktikum jedoch wichtiger war neues zu erlernen entschieden wir uns dazu, die Heurstic AI und die Search AI beiseite zu legen und eine neue Herausforderung zu suchen. Nach einer kurzen Recherche im Internet stiessen wir auf Reinforcement Learning - und hatten keine Ahnung was das genau ist. Wikipedia definiert RL wie folgt:
Bestärkendes Lernen oder verstärkendes Lernen (englisch reinforcement learning) steht für eine Reihe von Methoden des maschinellen Lernens, bei denen ein Agent selbständig eine Strategie erlernt, um erhaltene Belohnungen zu maximieren. Dabei wird dem Agenten nicht vorgezeigt, welche Aktion in welcher Situation die beste ist, sondern er erhält zu bestimmten Zeitpunkten eine Belohnung, die auch negativ sein kann. Anhand dieser Belohnungen approximiert er eine Nutzenfunktion, die beschreibt, welchen Wert ein bestimmter Zustand oder Aktion hat.
Bei Reinforcement Learning handelt es sich um ein riesengrossen, sehr komplexes Gebiet und wir konnten uns in dieser kurzen Zeit zu wenig Einarbeiten, um hier eine umfangreiche und verlässliche Beschreibung anzubieten.
Wir setzten auf auf die Library Keras-RL, welche die Open Source Deep-Learning-Bibliothek Keras um Reinforcement Learning erweitert. Zu Keras-RL gibt es eine gute Dokumentation, natürlich findet man auch Beispiele und Tutorials. Wir mussten einiges an Zeit investieren, um den ersten Prototypen unseres 2048-RL-Algorithmus zu erstellen - nur um dann festzustellen, dass er zwar irgendwas macht, jedoch ganz bestimmt nicht das, was wir uns erhofft haben. Es war an der Zeit, dass wir uns von anderen Leuten mit mehr Wissen inspirieren lassen - und offen gesagt ganz schön viel von deren Code kopiert haben ohne jedes Detail zu verstehen. Wir wollen jedoch auch betonen, dass wir uns erst seit ein paar Wochen mit AI auseinandersetzen und dadurch eher einen Überblick erhalten wollen, wie so etwas gemacht werden kann. Es wäre natürlich wünschenswert, dass wir am Ende des Kurses unseren Code besser verstehen - und die nachfolgende Beschreibung korrigieren können.
Zu Beginn wird versucht ein bestehendes Model aus dem Speicher zu laden. Falls keines vorhanden ist, dann wird ein neues erstellt:
NUM_MATRIX = 16 # Anzahl Matrizen für Encoding des Spielfelds
STATES = 1 # 1 State, da fully observable
INPUT_MATRIX_SIZE = (4, 4) # Input in Neurales Netzwerk -> 4x4 Matrix
INPUT_SHAPE = (STATES, 4 + 4 * 4, NUM_MATRIX,) + INPUT_MATRIX_SIZE
processor = InputProcessor(num_one_hot_matrices=NUM_MATRIX)
NUM_DENSE_NEURONS_L1 = 1024 # Anzahl Neuronen im 1. Layer
NUM_DENSE_NEURONS_L2 = 512 # Anzahl Neuronen im 2. Layer
NUM_DENSE_NEURONS_L3 = 256 # Anzahl Neuronen im 3. Layer
ACTIVATION_FTN = 'relu'
ACTIVATION_FTN_OUTPUT = 'linear'
# Erstelle das Modell
model = Sequential()
model.add(Flatten(input_shape=INPUT_SHAPE))
model.add(Dense(units=NUM_DENSE_NEURONS_L1, activation=ACTIVATION_FTN))
model.add(Dense(units=NUM_DENSE_NEURONS_L2, activation=ACTIVATION_FTN))
model.add(Dense(units=NUM_DENSE_NEURONS_L3, activation=ACTIVATION_FTN))
model.add(Dense(units=NUM_ACTIONS_PRO_AGENT, activation=ACTIVATION_FTN_OUTPUT))
Der InputProcessor
ist eine eigene Klasse, welche die Aktionen auf dem Spielfeld verarbeitet, später mehr dazu. Danach wird versucht ein bestehendes Training aus dem Speicher zu laden, ansonsten wir ein neuer Speicher erstellt:
memory = SequentialMemory(limit=MEMORY_SIZE, window_length=STATES)
Danach wird der Agent erstellt. Die meisten Parameter haben wir dabei kopiert und können diese nicht im Detail erklären - falls wir nach dem Kurs dazu in der Lage sind, holen wir dies gerne nach.
TRAIN_POLICY = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr='eps', value_max=0.05,
value_min=0.05, value_test=0.01, nb_steps=NUM_STEPS_ANNEALED)
TEST_POLICY = EpsGreedyQPolicy(eps=.01)
dqn = DQNAgent(model=model, nb_actions=NUM_ACTIONS_PRO_AGENT, test_policy=TEST_POLICY,
policy=TRAIN_POLICY, memory=memory, processor=processor,
nb_steps_warmup=NUM_STEPS_WARMUP, gamma=.99,
target_model_update=TARGET_MODEL_UPDATE,
train_interval=4, delta_clip=1.)
dqn.compile(Adam(lr=.00025), metrics=['mse'])
Danach werden verschiedene Callbacks gesetzt (diese werden nach jeder Episode aufgerufen), welche jedoch nur für Logs sowie dem Erstellen von Grafiken benötigt werden und wir hier nicht genauer darauf eingehen. Anschliessend wir das Training gestartet,
ENVIRONMENT_NAME = '2048'
environment = Game2048Env()
# Seed, damit reproduzierbar
random_seed = 123
random.seed(random_seed)
np.random.seed(random_seed)
environment.seed(random_seed)
dqn.fit(environment, callbacks=_callbacks, nb_steps=NUM_STEPS, visualize=False, verbose=0) # Starte Training
# Speichere die Gewichte nach dem Training
dqn.save_weights(weights_filepath, overwrite=True)
memory = (memory, memory.actions, memory.rewards, memory.terminals, memory.observations)
Am Ende werden sämtliche Daten gespeichert, damit das Training später wieder fortgesetzt werden kann.
Der weiter oben verwendete InputProcessor
erweitert die Processor
des rl.core
und ist eine Art Pre-Prozessor für den Input in das neurale Netzwerk. Der InputProcessor
berechnet alle möglichen Kombinationen für die nächsten zwei Schritte, welche durch den Spieler (Schieben in die Richtungen UP
, DOWN
, LEFT
, RIGHT
) und den Zufallsgenerator des Spiels (neue Zahl) vorgenommen werden können. Dies wird durch eine One-Hot-Encoding
Methode gemacht, wobei für jede mögliche Zahl eine Matrix erstellt wird und eine 0 anzeigt, dass an entsprechender Stelle die Zahl nicht vorkommt und eine 1, dass die Zahl an entsprechender Stelle vorhanden ist. Es gibt also die folgenden Matrizen:
Bei einem Spielfeld mit
hätten die Matrizen folgende Werte:
Diese Umwandlung wird durch folgenden Codeblock ausgeführt:
# Dict mit 2^n:n für alle möglichen Zahlen -> {2: 1, 4: 2, 8: 3, ..., 65536: 16}
self.table = {2 ** i: i for i in range(1, self.num_one_hot_matrices)}
self.table[0] = 0
def one_hot_encoding(self, grid):
grid_one_hot = np.zeros(shape=(self.num_one_hot_matrices, 4, 4))
for i in range(4):
for j in range(4):
element = grid[i, j]
grid_one_hot[self.table[element], i, j] = 1
return grid_one_hot
Des weiteren müssen diverse Funktionen von der KlasseProcessor
der rl.core
Library überschrieben werden. So beispielsweise die Funktionprocess_observation(self, observation)
, welche von der Keras-RL Library aufgerufen wird, um für jeden Obersvation-Zustand alle möglichen Zustände nach den nächsten beiden Schritten zu berechnen. Der Rückgabewert erfolgt dabei in der One-Hot-Encoding.
def process_observation(self, observation):
grids_after_player = self.get_grids_after_move_of_player(observation)
grids_after_chance = [] # nach Zufallsgenerator des Spiels
for grid in grids_after_player:
grids_after_chance.append(grid)
grids_temp = self.get_grids_after_move_of_player(grid)
for grid_temp in grids_temp:
grids_after_chance.append(grid_temp)
grids_list = np.array([self.one_hot_encoding(grid) for grid in
grids_after_chance])
return grids_list
Wir haben etwas mehr als 2500 Episoden laufen lassen, danach kamen wir an Speicherprobleme in der IDE, obwohl wir fast 17GB zugewiesen hatten. Da es doch sehr lange dauerte haben wir keine weiteren Läufe mehr vorgenommen und zeigen nachfolgenden gerne das Resultat:
Erhaltene Punkte pro Episode:
Höchster Block pro Episode:
Autoren: Luca Stähli und Pascal Sager