Panorama sur la Programmation réactive

Un des grands absents de l’article sur le panorama des technologies Web est la programmation réactive. Pour expliquer ce concept encore méconnu, Wikipedia en fait une définition plutôt abstraite « La programmation réactive est un paradigme de programmation visant à conserver une cohérence d'ensemble en propageant les modifications d'une source réactive (modification d'une variable, entrée utilisateur, etc.) aux éléments dépendants de cette source ». Nous n’allons pas ici en quelques lignes vous expliquer dans le détail le fonctionnement de la programmation réactive.

Un manifeste a été créé à ce sujet, des centaines d’articles ou présentations sont disponibles aujourd’hui à travers le Web. En revanche, nous allons essayer d’en faire un résumé permettant au béotien d’en comprendre les enjeux pour les années à venir. Par ailleurs, tout comme l’article précédent, l’idée est d’aborder l’aspect « fragmentation » dont n’échappe malheureusement pas la programmation réactive.

Qu’est-ce que c’est en deux mots ?

La programmation réactive part du constat que les ordinateurs sont devenus de plus en plus puissants mais que la complexité et la gourmandise des applications en ressources croit plus vite que les améliorations matérielles : parallélisation hardware, multi-cœurs, multi-cpu, mémoires SSD, etc… La faute à qui ? Pour donner une réponse simple, la faute réside dans la manière dont les applications sont programmées. Nous ne parlons pas ici des mauvais développeurs, mais des langages et environnements d’exécution qui ne permettent pas d’optimiser le temps CPU dans une logique impérative et séquentielle. Pour faire régulièrement des audits de code chez DNG, nous pouvons clairement chiffrer à 99,99% le nombre d’audit dont la conclusion face à des problèmes de performances est : « la réponse n’arrive pas dans un délai raisonnable car tous les threads sont en attente d’opérations d’entrées/sorties ». Les fameux I/O.

Or, les programmes d’aujourd’hui communiquent de plus en plus, que ce soit pour afficher une page Web (requêtes AJAX), lire un fichier ou lancer des requêtes SQL (ou NoSQL). Avec l’avènement des Microservices, les applications côté serveur se mettent aussi à discuter entre elles intensivement via des API (Rest ou autre). Dans un modèle multithreads, le système d’exploitation va solliciter son ordonnanceur de tâche (Scheduler) et passer d’un thread à l’autre (le fameux « context switch ») afin d’optimiser les ressources CPU. Mais globalement, la majeure partie du temps d’un processeur sera alloué à du « Idle Time », il attendra des ressources nécessaires à l’exécution de la tâche suivante. En se penchant un peu plus sur l’algorithme et la séquence de code d’un programme, on s’aperçoit qu’il est souvent possible d’anticiper certaines tâches non dépendantes de la tâche en cours. Il suffit de proposer au développeur un modèle dans lequel il devient, non seulement plus facile de paralléliser les tâches mais aussi plus facile de créer des applications disponibles, résilientes, souples et élastiques. Les 4 termes à la base du manifeste réactif. Nous dirons plus simplement que la programmation réactive est la programmation basée sur des flux de données asynchrones.

Cette programmation s’appuie sur des concepts empruntés à la programmation fonctionnelle (ce n’est pas nouveau) mais aussi aux capacités des langages (objets ou non) à proposer des mécanismes de pointeur de fonction et d’appels asynchrones.

Comment cela fonctionne-t-il ?

Le principe consiste à coder une application en partant du principe que toute source de données est par essence un flux. Que ce soit une variable, une collection (listes, map,…) ou un évènement participant à l’état d’une application. De la même manière qu’une programmation impérative s’appuie sur les mots clés du langage (for/each/if/switch..), la programmation réactive utilise des opérateurs pour connecter les flux entre eux. A l’aide d’opérations de base : filter(), map(), collect(), from(), join(), merge(), il est possible de combiner des flux, les calculer, les filtrer ou les réduire. Les adeptes de la programmation fonctionnelle auront reconnu des similarités. Et pour cause, l’origine de la programmation réactive provient essentiellement de concepts tirés des langages tels que Haskell.

Voici un exemple de code réactif. Nous avons ici une liste dont on souhaite extraire les valeurs commençant par « c » :

List<String> list =
        Arrays.asList("a", "b", "c");
list.stream()
        .filter(s -> s.startsWith("c"))
        .map(String::toUpperCase)
        .sorted()
        .forEach(System.out::println);

L’avantage principal de cette construction est de pouvoir paralléliser facilement les traitements mais aussi de chaîner des appels asynchrones pour résoudre élégamment le problème du « Callback Hell ». De plus en plus d’applications communiquent aujourd’hui via des appels asynchrones, le code lié à la gestion d’erreur et la propagation des valeurs renvoyées devient un vrai casse-tête.

La programmation réactive permet d’associer des flux à des appels asynchrones tout en chaînant d’autres flux avec d’autres appels. Dans ce processus type pipeline, la classe observeur souscrit à un observable (certains auront reconnu le célèbre pattern observer/observable). Lorsque des éléments sont émis dans le flux, l’observeur réagit automatiquement s’il s’est abonné aux évènements de ce flux. Cela permet de déporter de manière asynchrone l’attente liée à l’émission des données (socket réseau, lecture fichier, requête SQL, …) pour se focaliser sur les traitements associés à ces flux. Comme les flux ne sont jamais modifiés de manière concurrente (toute modification génère un nouveau flux), nous éliminons tout problème de contention ou de zone critique (variable partagée par plusieurs threads).  

Les promesses

Le modèle asynchrone étant un élément de base de la programmation réactive, ils nécessitent l’utilisation d’un procédé simple permettant de chaîner des traitements. C’est le rôle des Promesses. Une Promesse est une fonction qui joue le rôle d’intermédiaire pour invoquer un traitement.  Alors qu’une fonction synchrone renvoie la valeur cible demandée : T func(U) , une Promesse renvoie une fonction qui permettra à l’utilisateur d’en savoir un peu plus sur l’état de la requête ; en attente, succès, rejetée, etc…  Promise func(U). Une fois en possession de la Promesse, l’utilisateur demande explicitement l’exécution de la fonction cible avec une méthode particulière (ex : Promesse.get())

promesses

Comment se positionne les promesses dans le jargon RX ? On peut voir une promesse comme un Observable renvoyant une seule valeur, mais conceptuellement les flux permettent de manipuler plusieurs valeurs en provenance de multiple sources.

Dans le monde Java, l’implémentation la plus répandue des promesses est celle du JDK 7 mais surtout du JDK 8 avec respectivement les classes Future et CompletableFuture.

Future<String> appelSearch
        = executor.submit(new Callable<String>() {
    public String call() {
        return monService.search(parametre);
    }});
// On récupère la valeur de retour 
afficheValeur(appelSearch.get());

En réalité, la classe Future du JDK 7 n’est pas à proprement parler une Promesse car il n’est pas simple de la composer au travers d’un pipeline, d’où l’évolution proposée par le JDK 8 avec CompletableFuture. Cette dernière permet d’utiliser des opérateurs de type thenCompose(), thenApply(), thenAccept(), ou encore handle() pour la gestion d’erreur. Voici un exemple de Promesse en JDK 8.

CompletableFuture<Integer> requete = CompletableFuture.completedFuture(0);
requete.thenApply((val)->val+1)
       .thenApply((val)->val+2)
       .thenAccept((val)->{
            if (val==3) throw new RuntimeException("exception!");
        })
       .whenComplete((t,exception)->{
            System.out.println("valeur finale " + t);
            System.out.println("exception =" + exception);
        });

Les fonctions dans cet exemple sont très simples, elles ne font qu’ajouter la valeur « 1 » au paramètre. Lorsque la valeur est égale à « 3 » la fonction lève une exception qui sera gérée dans whenComplete().

Dans cet exemple, l’appel est synchrone. Pour réaliser un appel asynchrone, il suffit simplement de remplacer thenApply() par thenApplyAsync(). La classe CompletableFuture permet de faire beaucoup plus de choses, n’hésitez pas à vous référer à cet article.

Quels Framework sur le marché ?

C’est là où les choses se compliquent. Jusqu’à présent, nous avons essayé d’être conceptuels avec les notions de Promesses et d’Observable. Il faut savoir qu’il existe de nombreuses initiatives RX. La première, probablement la plus représentative est le manifeste réactif. L’idée de ce manifeste dans lequel on trouve plusieurs personnalités d’origine diverses, est de tracer les grandes lignes de la programmation réactive pour laisser ensuite chaque plateforme proposer sa propre implémentation. Microsoft a longtemps été fer de lance dans ce domaine avec l’introduction de Reactive Linq  (Reactive Extensions). Dans le monde Java, l’implémentation la plus connue est RXJava et dans une moindre mesure Akka. Dans d’autres langages, il en existe plus d’une quinzaine, que ce soit en Ruby, C++, PHP ou Swift. JavaScript a aussi ses aficionados à travers RxJS

Pour revenir au monde Java, la fragmentation évoquée dans le panorama Web précédent a frappé et nous avons aujourd’hui deux écoles : celle de RXJava, historique, fidèle aux concepts originels du manifeste RX et de l’autre côté le JDK 8 avec ses Streams.

Le code suivant illustre les deux implémentations : 

 List<Equipe> equipes = new ArrayList<>();
 Equipe france = new Equipe("france");
 france.add("gignac");
 france.add("benzema");
 france.add("varane");

 Equipe espagne = new Equipe("espagne");
 espagne.add("torres");
 espagne.add("iniesta");

 equipes.add(france);
 equipes.add(espagne);

 // Requête en utilisant la bibliothèque Stream du JDK 8
 List<String> listeDesJoueursFranceEtEspagneJDK = equipes.stream()
         .map(d -> d.getJoueurs())
         .flatMap(l -> l.stream())
         .collect(Collectors.toList());

 // Requête en utilisant la bibliothèque RXJava
List<String> listeDesJoueursFranceEtEspagneRXJava = rx.Observable.from(equipes)
         .map(d -> d.getJoueurs())
         .flatMap(l -> rx.Observable.from(l))
         .toList().toBlocking().single();

 // Renvoie [gignac, benzema, varane, iniesta, torres]
 System.out.println(listeDesJoueursFranceEtEspagneJDK);
 // Renvoie aussi [gignac, benzema, varane, iniesta, torres]
 System.out.println(listeDesJoueursFranceEtEspagneRXJava);

On peut voir que les opérateurs sont très proches même si l’implémentation est différente. Nous n’entrerons pas dans les détails de ces deux Framework (l’un étant orienté « Push » et l’autre « Pull »). Si vous souhaitez creuser le sujet, nous vous conseillons la lecture de cette excellente présentation de José Paumard.

La coexistence de ces deux Framework aux objectifs similaires, l’un plus historique, l’autre plus récent et peut-être aussi plus « accessible » ne simplifie pas toujours les choix techniques. Et pour enfoncer le clou, il existe à l’heure actuelle des travaux au sein d’Oracle  pour combler l’écart entre les JDK Streams et RxJava en implémentant une nouvelle API (ou plutôt une sorte d’extension) qui serait appelée les JDK Flow. Cette API proposerait les classes Flow.Publisher et Flow.Subscriber comme le fait RxJava avec son principe d’Observer. Pas sûr que cela simplifie réellement la compréhension du code, nous attendons d’en savoir un peu plus sur la tournure de cette fameuse JSR 166 dédiée au sujet.

Microsoft et Linq

La présence d’Erik Meijer (grand défenseur des concepts fonctionnels avec Haskell) n’est pas étrangère à l’implication de Microsoft, que ce soit au travers de Linq ou de RX.NET. Les streams du JDK 8 constituent un peu le pendant de Linq dans .NET. On peut regretter qu’il ait fallu attendre presque 10 ans avant que Java ne décide de fournir une telle fonctionnalité. En revanche, il faut souligner un élément important. Là où JDK 8 Stream est une implémentation plus ou moins réactive qui s’appuie fortement sur les collections du JDK, Linq propose des « extensions » du langage implémentées ensuite par des implémentations ; Linq to XML, Linq to Objects. Il existe même Linq to Entities permettant de requêter son Framework ORM SQL maison, Entity Framework.

Linq fonctionne à l’aide d’opérateurs et d’expressions implémentées par une source de données cible. Pour faire un parallèle (ou plutôt un raccourci) avec Java, c’est comme si le monde Java s’accordait sur la définition des opérateurs Filter(), Map()/ForEach(), Collector() pour les intégrer dans une API « abstraite » qui serait ensuite implémentée par Hibernate, MongoDb, Cassandra, Oracle, etc ….. Et pour pousser encore la philosophie d’un cran, les opérateurs et les expressions seraient directement intégrés au langage JAVA comme structure de contrôle. On en est loin mais cela démontre à quel point le langage JAVA évolue aujourd’hui à « contre-temps » par rapport à .NET.

A titre d’information, voici un exemple de code Linq, on aurait bien aimé remplacer HQL (Hibernate Query Language) ou JPQL (trop orienté « chaînes de caractères ») par des expressions de ce type :

Decimal totalDue = 200;
using (MaBaseDeDonnees session = new MaBaseDeDonnees()) {
    IQueryable<int> salesInfo =
        from s in session.SalesOrderHeaders
        where s.TotalDue >= totalDue
        select s.SalesOrderID;
 
    Console.WriteLine("Info de commande:");
    foreach (int orderNumber in salesInfo) {
        Console.WriteLine("Commande numero: " + orderNumber);                    
    }
 }

 Le mot clé « await »

Voici encore un autre domaine dans lequel Microsoft a été précurseur. La prochaine spécification de JavaScript ES7 va inclure un nouveau mot-clé « await ». Celui-ci jouera le rôle de sucre syntaxique permettant « d’asynchroniser » du code d’apparence synchrone. Tout ceci est basé sur les Promesses. On pourra écrire le code suivant :

async function write () {
  var txt = await read();
  console.log(txt);
}

Le futur des langages actuels consiste à généraliser l’usage des Promesses, soit pour faire du réactif soit simplement de l’asynchrone classique. Or, les Promesses restent encore verbeuses, l’idée du mot clé « await » consiste donc à rétablir la construction impérative originelle pour la réécrire ensuite avec des Promesses, le tout par génération de code. Cela suppose évidemment des contraintes, notamment sur les signatures des méthodes appelées par await. Dans le monde Microsoft, une Promesse est une tâche (classe Task). Dans le monde Java, il faudra accorder les violons entre d’un côté la classe Observable de RXJava, de l’autre la classe CompletableFuture du JDK et bientôt le JDK 9 Flow. D’où l’intérêt encore une fois des standards ! 

A titre d’information, « await » existe en C# et en VB.NET depuis 2010, cet article montre un cas d’usage.

Programmation réactive et Microservices

Avec la démocratisation des microservices, les applications seront de plus en plus amenées à communiquer. En Java, JAX-RS reste la référence pour tout ce qui touche à REST, cette API s’est considérablement « asynchronisée » depuis la version 2. Le code suivant, tiré de cet excellent article, illustre la puissance des Promesses lorsqu’il s’agit de mixer plusieurs appels asynchrones :

import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import java.util.concurrent.CompletableFuture;
    @Path("/")
    public class AsyncResource {
        @GET
        @Path("/userInfo/{user}")
        @Produces(MediaType.APPLICATION_JSON)
        public void userInfoAsync(@Suspended AsyncResponse asyncResponse,
                                  @PathParam("user") String user) {

            // On crée les deux appels asynchrones
            CompletableFuture<GitHubUser> gitHubFuture =
                Futures.toCompletable(gitHubService.userAsync(user), executor);
            CompletableFuture<FacebookUser> facebookFuture =
                Futures.toCompletable(facebookService.userAsync(user), executor);
            // Et on les combine, on peut très bien imaginer d'autres appels 
            gitHubFuture
                .thenCombine(facebookFuture, (g, f) -> new UserInfo(f, g))
                .thenApply(info -> asyncResponse.resume(info))
                .exceptionally(e -> asyncResponse.resume(
                    Response.status(INTERNAL_SERVER_ERROR).entity(e).build()));
            asyncResponse.setTimeout(1000, TimeUnit.MILLISECONDS);
            asyncResponse.setTimeoutHandler(ar -> ar.resume(
                Response.status(SERVICE_UNAVAILABLE).entity("Operation timed out").build()));
        }
    }

Ce code n’est pas pollué par les innombrables try/catch pour la gestion d’exception. La combinaison de deux appels asynchrones avec l’ordre thenCombine() permet de lire séquentiellement une construction asynchrone. Il suffit d’ajouter d’éventuels filtres ou opérations de map du JDK Stream pour disposer d’une vraie puissance d’expression. Cela change d’une lecture de code impératif avec des structures de contrôle de type boucles « for/each » ou « if ». Mais n’oublions pas qu’il est toujours possible de faire coexister code impératif et code plus « fonctionnel ».

A l’avenir, on s’attend à une généralisation du modèle réactif. Un accord sur un ensemble de classes minimales permettant aux outils de renvoyer des Promesses Java « standards » serait souhaitable. Bénéficier d’un Hibernate réactif nécessiterait d’avoir un JDBC réactif. Il faudrait un Spring réactif et même une spécification JEE réactive. La plupart des outils permettant de renvoyer ou gérer des flux de données commencent à être réactifs. Reste à résoudre la complexité technique de ces systèmes massivement synchrones. Le mot clé « await » et la génération de code sont un premier élément de réponse.

Conclusion

A travers cet article vous avez compris que les enjeux du modèle réactif sont énormes. Plus les applications communiquent avec des tiers externes, plus elles sont candidates au RX. Lorsque les besoins en performances sont importants, il faut un modèle de développement adapté. Si ce n’est pas la mort de l’impératif nous allons de plus en plus vers un mélange entre programmation objet/impérative et programmation fonctionnelle. Le JDK 8 a ouvert un pan entier de possibilités aujourd’hui non exploitées par les développeurs Java. Il y a probablement un problème de compétence mais aussi un blocage « philosophique ». Les morceaux de code illustrés dans cet article ne sont pas triviaux. Le développeur doit changer sa façon d’aborder la programmation. Parmi eux, les plus anciens auront peut-être plus de difficultés car ils sont pollués par l’expérience impérative. Les plus jeunes, eux, ont tout le temps d’apprendre le RX à l’école, et pas seulement au travers de langages purement fonctionnels tels que Scheme, ML ou Haskell.    

Sami Jaber
DNG Consulting