In [None]:
%pip install 'gymnasium[classic-control]' --upgrade

In [11]:
%matplotlib inline
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import clear_output
import sklearn.pipeline
import sklearn.preprocessing
from sklearn.kernel_approximation import RBFSampler
import sys

<img src="https://stanford-cs221.github.io/autumn2023/assignments/mountaincar/mountaincar.png" width="400">

On considère l'environnement *Mountain Car* où on contrôle une voiture dans une vallée avec pour but d'atteindre le sommet de droite. On dispose de 3 actions: accélérer vers la gauche (0), ne pas accélerer (1), accélérer vers la droite (2). Un état correspond à un couple $(m,v)\in \left[ - 1.2,.6 \right] \times \left[ - .07 ,.07 \right]$ correspondant à la position (l'abscisse) et la vitesse. L'état de départ d'une interaction ("épisode") est choisi aléatoirement mais toujours dans la vallée.

On pourra consulter la documentation officielle pour plus d'informations : https://gymnasium.farama.org/environments/classic_control/mountain_car/

In [3]:
env = gym.make("MountainCar-v0")

In [4]:
def render_policy(policy):
    local_env = gym.make("MountainCar-v0", render_mode='rgb_array')
    s = local_env.reset()[0]
    terminated = False
    truncated = False
    while not (terminated or truncated):
        a = policy(s)
        s, r, terminated, truncated , _ = local_env.step(a)
        clear_output(wait=True)
        plt.grid(False)
        plt.xticks([])
        plt.yticks([])
        plt.imshow(local_env.render())
        plt.show()
    plt.close()

La fonction `render_policy` ci-dessus permet de visualiser une politique durant un épisode, sous la forme d'une animation. L'argument `policy` doit être une fonction prenant en argument un état et renvoyant une action (choisie éventuellement de façon aléatoire).

*Question 1* : Visualiser la politique accélérant toujours vers la droite.

*Question 2* : Proposer une meilleure politique et la visualiser. Estimer la longueur moyenne d'un épisode lorsque cette politique est employée.

Pour le reste du TP, on oublie la description de l'environnement ainsi que l'intuition qu'on en a. On suppose qu'on a seulement la connaissance de l'ensemble des actions, de l'ensemble des états, et qu'on peut iteragir sans limite avec l'envionnement dans le cadre d'épisodes dont on ne choisit pas l'état initial.

L'ensemble d'états étant infini (ou en toute rigueur, fini mais de cardinal très élevé, si on considère que les valeurs sont de type float32), on souhaite construire un algorithme d'apprentissage par renforcement s'appuyant sur une famille paramétrique de fonctions valeur. 

On fait le choix d'une paramétrisation linéaire de fonctions
action-valeur en dimension 1200 $(q_w)_{w\in \mathbb{R}^{1200}}$:
$ q_w(s,a)=\phi(s,a)^{\top}\!w,$
où la *feature map* $\phi:\mathcal{S}\times \mathcal{A}\to \mathbb{R}^{1200}$
est construite de la façon suivante à l'aide d'une application
$\psi:\mathcal{S}\to \mathbb{R}^{400}$:

$ \phi(s,0)=(\psi(s)|0_{\mathbb{R}^{800}})^{\top} $

$ \phi(s,1)=(0_{\mathbb{R}^{400}}|\psi(s)|0_{\mathbb{R}^{400}})^{\top} $

$ \phi(s,2)=(0_{\mathbb{R}^{800}}|\psi(s))^{\top} $

On construit la fonction $\psi$ de la façon suivante,
en effectuant d'abord une normalisation (sur la base de 10000 exemples de valeurs d'état), puis en utilisant la fonction `RBFSampler` de `scikit-learn` qui permet d'approximer en dimension finie la `feature map` d'un noyau gaussien.

In [7]:
observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(observation_examples)

# Used to convert a state to a featurizes represenation.
# We use RBF kernels with different variances to cover different parts of the space
featurizer = sklearn.pipeline.FeatureUnion([
        ("rbf1", RBFSampler(gamma=5.0, n_components=100)),
        ("rbf2", RBFSampler(gamma=2.0, n_components=100)),
        ("rbf3", RBFSampler(gamma=1.0, n_components=100)),
        ("rbf4", RBFSampler(gamma=0.5, n_components=100))
        ])
featurizer.fit(scaler.transform(observation_examples))

def psi(s):
    return np.squeeze(featurizer.transform(scaler.transform(s[np.newaxis,:])))

*Question 3* : Compléter les fonctions suivantes qui calculent $\phi(s,a)$ et $q_w(s,a)$ selon les définitions données plus haut. *On pourra faire appel à la fonction `np.concatenate`.*

In [8]:
def phi(s,a):
    # compléter

def q(s,a,w):
    # compléter

*Question 4* : En s'inspirant des TP précédents, écrire une fonction `draw_action_greedy_policy` qui tire une action selon une politique $\epsilon$-gloutonne par rapport à une fonction action-valeur $q_w$ de la famille paramétrique.

In [9]:
def draw_action_greedy_policy(s, w, eps=0):
    # compléter

*Question 5* : Implémenter un Q-learning (semi-gradient). Essayer plusieurs valeurs pour $\alpha$. Visualiser la politique obtenue au bout de 10 épisodes (puis 20, 50, 100, 200 épisodes). Tracer l'évolution de la longueur des épisodes.

In [13]:
gamma = .99
w = np.zeros(400*3)
n_episodes = 10
k = 0

for episode in range(n_episodes):
    sys.stdout.flush()

    s = env.reset()[0]

    terminated = False

    # on ignore le signal de troncation, afin d'aller au-delà de la limite des 200 étapes par épisode
    while not terminated:
        # compléter

        print("\rStep {} @ Episode {}/{}".format(k, episode + 1, n_episodes), end="")
        k += 1
env.close()

Step 10047 @ Episode 10/10 (-329.0)

*Question 6* : Même question avec SARSA (à n étapes) (semi-gradient).