Forum |  HardWare.fr | News | Articles | PC | S'identifier | S'inscrire | Shop Recherche
1228 connectés 

  FORUM HardWare.fr
  Programmation
  C++

  Thread Pool

 


 Mot :   Pseudo :  
 
Bas de page
Auteur Sujet :

Thread Pool

n°2149473
in_your_ph​ion
Posté le 16-07-2012 à 12:42:18  profilanswer
 

Bonjour,

 

J'essaie de faire un thread pool pour voir comment ça marche.
http://en.wikipedia.org/wiki/Thread_pool_pattern

 

Est ce que vous pourriez me dire si mon implémentation est correcte ...?

 

merci par avance

 

les tâches font rien de spécial, j'ai juste crée ça :

 
Code :
  1. class Task
  2. {
  3. public:
  4.     void execute() const
  5.     {
  6.         cout << "task does job ... " << endl;
  7.     }
  8. };
 

Pour le thread pool, j'ai crée une classe avec une queue qui contient les taches, et un vecteur pour les n threads :

 
Code :
  1. class ThreadPool
  2. {
  3. public:
  4.     ThreadPool(unsigned nbThreads  = 2) : _nbThreads( nbThreads )
  5.     {
  6.         pthread_mutex_init( & _mtx, 0 );
  7.         pthread_cond_init( & _cond, 0);
  8.     }
  9.    
  10.     void push( const Task & task );
  11.    
  12.     Task pop();
  13.     void initialize();
  14. private:
  15.    
  16.     unsigned _nbThreads;
  17.     pthread_mutex_t _mtx;
  18.     pthread_cond_t _cond;
  19.     queue<Task> _tasks;
  20.     vector<pthread_t> _threads;
  21. };
 


La fonction associée aux threads dépile les taches et execute la tâche courante :

Code :
  1. void * treat( void * data )
  2. {
  3.     ThreadPool * threadPool = (ThreadPool*)data;
  4.     while ( true )
  5.     {
  6.         Task t = threadPool->pop();
  7.         t.execute();
  8.     }
  9.     return (void*)0;
  10. }
 


voici le code complet :

 
Code :
  1. #include "pthread.h"
  2. #include <iostream>
  3. #include <queue>
  4. #include <vector>
  5. using namespace std;
  6. void * treat( void * data );
  7. class Task
  8. {
  9. public:
  10.     void execute() const
  11.     {
  12.         cout << "task does job ... " << endl;
  13.     }
  14.     const std::string operator << (int)
  15.     {
  16.         char tmp[16];
  17.         sprintf(tmp,"%s",this);
  18.         return string(tmp);
  19.     }
  20. };
  21. class ThreadPool
  22. {
  23. public:
  24.     ThreadPool(unsigned nbThreads  = 2) : _nbThreads( nbThreads )
  25.     {
  26.         pthread_mutex_init( & _mtx, 0 );
  27.         pthread_cond_init( & _cond, 0);
  28.     }
  29.    
  30.     void push( const Task & task );
  31.    
  32.     Task pop();
  33.     void initialize();
  34. private:
  35.    
  36.     unsigned _nbThreads;
  37.     pthread_mutex_t _mtx;
  38.     pthread_cond_t _cond;
  39.     queue<Task> _tasks;
  40.     vector<pthread_t> _threads;
  41. };
  42. void ThreadPool::push( const Task & task )
  43. {
  44.     pthread_mutex_lock( & _mtx );
  45.     cout << "push new data on queue" << endl;
  46.     _tasks.push( task );
  47.     pthread_cond_signal( & _cond );
  48.     pthread_mutex_unlock( & _mtx );
  49. }
  50. Task ThreadPool::pop()
  51. {
  52.     pthread_mutex_lock( & _mtx );
  53.     cout << "ThreadPool::pop(), tid:" << *(int*)&pthread_self() << endl;
  54.     while ( _tasks.empty() )
  55.         pthread_cond_wait( & _cond , & _mtx );
  56.     Task t = _tasks.front();
  57.     _tasks.pop();
  58.     pthread_mutex_unlock( & _mtx );
  59.    
  60.     return t;
  61. }
  62. void ThreadPool::initialize()
  63. {
  64.     cout << "initialize " << _nbThreads << " threads" << endl;
  65.     for ( int i = 0 ; i < _nbThreads ; ++i )
  66.     {
  67.         pthread_t thread;
  68.         _threads.push_back( thread );
  69.     }
  70.     vector<pthread_t>::iterator it = _threads.begin();
  71.     for ( ; it != _threads.end() ; ++ it)
  72.     {
  73.         pthread_create( &(*it), 0 , treat , this );
  74.     }
  75. }
  76. void * treat( void * data )
  77. {
  78.     ThreadPool * threadPool = (ThreadPool*)data;
  79.     while ( true )
  80.     {
  81.         Task t = threadPool->pop();
  82.         t.execute();
  83.     }
  84.     return (void*)0;
  85. }
  86. int main()
  87. {
  88.      ThreadPool tp(2);
  89.      tp.initialize();
  90.      vector<Task> someTasks(10);
  91.      vector<Task>::iterator it = someTasks.begin();
  92.      for ( ; it != someTasks.end() ; ++ it )
  93.      {
  94.          tp.push( *it );
  95.      }
  96.      cin.get();
  97.      return 0;
  98. }
 


ça vous parait bien ou y a t-il une erreur de design ou de conception ?

 

merki


Message édité par in_your_phion le 16-07-2012 à 14:59:31
mood
Publicité
Posté le 16-07-2012 à 12:42:18  profilanswer
 

n°2149530
theshockwa​ve
I work at a firm named Koslow
Posté le 16-07-2012 à 17:07:03  profilanswer
 

Tes locks devraient être faits en RAII, tu verras, ca allège pas mal le code dès que tu as des fonctions un peu complexes.
 
Je ne suis pas fan du while(true) dans la fonction treat. Globalement, killer des threads, c'est pas très joli, et avec la manière dont c'est architecturé, là, t'as pas trop le choix.
 


---------------
last.fm
n°2149544
in_your_ph​ion
Posté le 16-07-2012 à 19:08:35  profilanswer
 

theshockwave a écrit :

Tes locks devraient être faits en RAII, tu verras, ca allège pas mal le code dès que tu as des fonctions un peu complexes.

 

Je ne suis pas fan du while(true) dans la fonction treat. Globalement, killer des threads, c'est pas très joli, et avec la manière dont c'est architecturé, là, t'as pas trop le choix.

 


 

hello !
merci pour tes réponses, pour les locks/unlocks en fait j'ai eu un peu la flemme de les faire en RAII  :jap:

 

sinon, comment je pourrais architecturer différement pour ne pas avoir à killer mes threads ...? il y a une technique pour ça ?

 

autrement, comment on choisis le nombre de threads ? J'ai essayé avec plusieurs valeurs, même jusqu'à 512 lol. ça doit être combien généralement ..?

 

merci  ;)

 

ps : par ailleurs, pourquoi les "cout" s'affichent souvent un peu n'importe comment, alors que pas les printf ? (pour les mêmes messages)

Message cité 1 fois
Message édité par in_your_phion le 16-07-2012 à 19:10:33
n°2149545
theshockwa​ve
I work at a firm named Koslow
Posté le 16-07-2012 à 19:20:38  profilanswer
 

Ca dépend complètement de tes besoins.
Si tu vois ton système comme un job manager (Edit : micro tâches pour faire une distributions en découpant au max tes lourdes tâches), tu vas probablement vouloir avoir autant de threads à dispositions que tu as de threads hardware sur ta machine.
Si tu vois ca comme juste la possibilité de faire du "Fire and Forget" sur tes tâches (Edit : tâches lourdes) pour un max de client, peut-être que tu en voudras plus quitte à ralentir ta machine juste pour que le boulot pour chaque client commence au plus tôt ...
 
Dans le premier cas, tu voudras aussi probablement une interface qui te permette d'ajouter N tâches d'un coup. Dans l'autre, probablement pas.
 
Edit : les cas que je liste ne sont pas nécessairement les seuls que tu rencontreras, évidemment.

Message cité 1 fois
Message édité par theshockwave le 16-07-2012 à 19:22:19

---------------
last.fm
n°2149616
in_your_ph​ion
Posté le 17-07-2012 à 10:28:31  profilanswer
 

theshockwave a écrit :

Ca dépend complètement de tes besoins.
Si tu vois ton système comme un job manager (Edit : micro tâches pour faire une distributions en découpant au max tes lourdes tâches), tu vas probablement vouloir avoir autant de threads à dispositions que tu as de threads hardware sur ta machine.
Si tu vois ca comme juste la possibilité de faire du "Fire and Forget" sur tes tâches (Edit : tâches lourdes) pour un max de client, peut-être que tu en voudras plus quitte à ralentir ta machine juste pour que le boulot pour chaque client commence au plus tôt ...
 
Dans le premier cas, tu voudras aussi probablement une interface qui te permette d'ajouter N tâches d'un coup. Dans l'autre, probablement pas.
 
Edit : les cas que je liste ne sont pas nécessairement les seuls que tu rencontreras, évidemment.


 
hello !
ok merci pour tes réponses. Quand tu parles de "threads hardware", ce sont en fait les coeur du CPU non ?
sinon, à partir de quand une tache deviens "lourde" ?  [:tinostar]

n°2149629
theshockwa​ve
I work at a firm named Koslow
Posté le 17-07-2012 à 11:20:48  profilanswer
 

in_your_phion a écrit :


 
hello !
ok merci pour tes réponses. Quand tu parles de "threads hardware", ce sont en fait les coeur du CPU non ?
sinon, à partir de quand une tache deviens "lourde" ?  [:tinostar]


 
 
Threads hardware, c'est pas tout à fait coeur, mais presque. Sur du intel, avec hyperthreading, tu as deux threads hardware par coeur.
Encore une fois, la "lourdeur" de la tâche dépend du contexte. Suivant les contraintes que tu as en terme de temps (obligation de terminer tes tâches dans un délais imparti) ou d'architecture (genre rentabiliser les SPUs d'un cell qui n'ont pas d'accès direct à la mémoire) ta notion de "lourdeur" va varier.
 
J'ai tendance à penser qu'un système de jobs "légers", ca va être quand tu as besoin que tous tes jobs aient terminé pour pouvoir en lancer de nouveaux (à peu de choses près) alors que le système de tâches "lourdes", ca va plus être du fire and forget ... Ca termine quand ca peut et tu en tiens compte à ce moment là.
 
Exemple : dans un jeu vidéo, tu vas sans doute avoir une phase de mise à jour des positions de tes objets que tu vas pouvoir paralléliser, et tu vas avoir une attente forte dessus pour pouvoir afficher le résultat à l'écran. Là, clairement, on est dans un contexte de tâches "légères" (c'est critique si ca prend plus de 2 ou 3 millisecondes). Par contre, une requête d'IA, si c'est pas immédiat, c'est peut-être pas si grave, du coup c'est "lourd" (quelques dizaines de millisecondes)
 
Dans le cas des tâches "légères", tu veux clairement faire correspondre tes threads disponibles à tes ressources matérielles parce que c'est critique.
Dans le cas des tâches "lourdes", tu veux juste avoir un pool de threads sous le coude pour ne pas payer le coût de la création d'un thread à chaque lancement de tâche.


---------------
last.fm
n°2149636
theshockwa​ve
I work at a firm named Koslow
Posté le 17-07-2012 à 11:32:44  profilanswer
 

Autre détail : tu as des mutex pour protéger ta file de tâches. Normalement, tu dois pouvoir faire une "lock-free queue". En terme de performance, ca peut changer la vie.
Si tu vires ce mutex, la condition ne sera plus utilisable, tu n'auras plus besoin que d'un système de signal basique et tes threads attendront ce signal quand la liste de tâches est vide. Ton manager pourra réveiller tous les threads ou juste ceux qu'ils choisi en fonction de ce qui arrive à traiter.


---------------
last.fm
n°2149642
in_your_ph​ion
Posté le 17-07-2012 à 12:07:27  profilanswer
 

theshockwave a écrit :

 


Threads hardware, c'est pas tout à fait coeur, mais presque. Sur du intel, avec hyperthreading, tu as deux threads hardware par coeur.
Encore une fois, la "lourdeur" de la tâche dépend du contexte. Suivant les contraintes que tu as en terme de temps (obligation de terminer tes tâches dans un délais imparti) ou d'architecture (genre rentabiliser les SPUs d'un cell qui n'ont pas d'accès direct à la mémoire) ta notion de "lourdeur" va varier.

 

J'ai tendance à penser qu'un système de jobs "légers", ca va être quand tu as besoin que tous tes jobs aient terminé pour pouvoir en lancer de nouveaux (à peu de choses près) alors que le système de tâches "lourdes", ca va plus être du fire and forget ... Ca termine quand ca peut et tu en tiens compte à ce moment là.

 

Exemple : dans un jeu vidéo, tu vas sans doute avoir une phase de mise à jour des positions de tes objets que tu vas pouvoir paralléliser, et tu vas avoir une attente forte dessus pour pouvoir afficher le résultat à l'écran. Là, clairement, on est dans un contexte de tâches "légères" (c'est critique si ca prend plus de 2 ou 3 millisecondes). Par contre, une requête d'IA, si c'est pas immédiat, c'est peut-être pas si grave, du coup c'est "lourd" (quelques dizaines de millisecondes)

 

Dans le cas des tâches "légères", tu veux clairement faire correspondre tes threads disponibles à tes ressources matérielles parce que c'est critique.
Dans le cas des tâches "lourdes", tu veux juste avoir un pool de threads sous le coude pour ne pas payer le coût de la création d'un thread à chaque lancement de tâche.

 


merci bien pour toutes ces explications ! pourtant il m'avait semblé lire que le thread pool devait être utilisé pour des tâches qui ne durent pas très longtemps, mais peut être que c'est relatif ?

 

Par exemple, si tu as une matrice géante et que tu veux paralléliser les calculs (un thread par calcul de produit scalaire ligne x colonne), est ce qu'un thread pool est approprié ?

 
theshockwave a écrit :

Autre détail : tu as des mutex pour protéger ta file de tâches. Normalement, tu dois pouvoir faire une "lock-free queue". En terme de performance, ca peut changer la vie.
Si tu vires ce mutex, la condition ne sera plus utilisable, tu n'auras plus besoin que d'un système de signal basique et tes threads attendront ce signal quand la liste de tâches est vide. Ton manager pourra réveiller tous les threads ou juste ceux qu'ils choisi en fonction de ce qui arrive à traiter.

 

merci je vais essayer ça  :jap: dans ce cas le signal est juste une variable partagée par les threads ? (genre un booléen ?)

Message cité 1 fois
Message édité par in_your_phion le 17-07-2012 à 12:08:05
n°2149646
theshockwa​ve
I work at a firm named Koslow
Posté le 17-07-2012 à 12:38:39  profilanswer
 

in_your_phion a écrit :


 
 
merci bien pour toutes ces explications ! pourtant il m'avait semblé lire que le thread pool devait être utilisé pour des tâches qui ne durent pas très longtemps, mais peut être que c'est relatif ?  
 
Par exemple, si tu as une matrice géante et que tu veux paralléliser les calculs (un thread par calcul de produit scalaire ligne x colonne), est ce qu'un thread pool est approprié ?
 


 
houlà, c'est toujours des threads pools, ce sont juste deux cas de figures différents qui font que tu vas pas forcément programmer ton pool de la même manière. Quoiqu'il arrive, oui, quand tu veux paralléliser c'est une bonne idée d'avoir un pool de threads, quelles que soient tes contraintes. Ton exemple de multiplication de matrices me fait plus penser à un process léger, justement, où tu vas distribuer ton calcul de manière à ce que tous tes threads hardwares tournent à 100% mais pas plus. En gros, sur ce genre de tâches, on cherche aussi à minimiser les changements de tâches des CPUs. Passer d'un contexte de thread à un autre n'est pas gratuit pour le matériel. Quand tu compares ca à une série de multiplications et additions même en flotttants, ca reste conséquent.
 
 

in_your_phion a écrit :

merci je vais essayer ça  :jap: dans ce cas le signal est juste une variable partagée par les threads ? (genre un booléen ?)


Dans l'idée, c'est un booléen, oui, mais tu dois avoir un système de signal dans les pthreads, j'imagine, qui te permettra de le faire correctement. (attention à ne pas utiliser juste un signal "instantané" que tes threads risqueraient de rater mais bien quelque chose qui place un flag jusqu'à ce que ce soit resetté soit manuellement, soit implicitement par une lecture)
 


---------------
last.fm
n°2149699
xilebo
noone
Posté le 17-07-2012 à 20:43:41  profilanswer
 

in_your_phion a écrit :


 
hello !
merci pour tes réponses, pour les locks/unlocks en fait j'ai eu un peu la flemme de les faire en RAII  :jap:  
 
sinon, comment je pourrais architecturer différement pour ne pas avoir à killer mes threads ...? il y a une technique pour ça ?
 
autrement, comment on choisis le nombre de threads ? J'ai essayé avec plusieurs valeurs, même jusqu'à 512 lol. ça doit être combien généralement ..?
 
merci  ;)  
 
ps : par ailleurs, pourquoi les "cout" s'affichent souvent un peu n'importe comment, alors que pas les printf ? (pour les mêmes messages)


 
 
Ce que je fais en général, c'est une boucle de travail dans mon thread avec une variable de controle , et je suspend mon thread pendant un temps donné s'il n'a rien à faire.  
 
Pour le détruire, il suffit de changer la valeur de la variable de controle, de resumer le thread et de le join.
 

Code :
  1. void * ma_fonction_thread( void * param )
  2. {
  3. while ( !sortie )
  4. {
  5.   // faire le travail s'il y a
  6.   // puis suspendre le thread un temps donné
  7.    struct timespec timeout; // mettre le temps voulu
  8.    pthread_mutex_lock(&m_hMutex);
  9.    pthread_cond_timedwait(&m_hEvent,&m_hMutex, &timeout);
  10.    pthread_mutex_unlock(&m_hMutex);
  11. }
  12. return NULL;
  13. }
  14. // la fonction qui resume  
  15. void resume( )
  16. {
  17. pthread_mutex_lock(&m_hMutex);
  18. pthread_cond_signal(&m_hEvent);
  19. pthread_mutex_unlock(&m_hMutex);
  20. }
  21. // il faut en plus initialiser de cette facon la le thread
  22. void init()
  23. {
  24. pthread_cond_init(&m_hEvent,NULL);
  25. pthread_mutex_init(&m_hMutex,NULL);
  26. }
  27. // et detruire
  28. void shut()
  29. {
  30. pthread_cond_destroy(&m_hEvent);
  31. pthread_mutex_destroy(&m_hMutex);
  32. }
  33. // pour terminer proprement le thread  
  34. void terminate()
  35. {
  36. // on change la valeur de la variable de controle
  37. sortie = true;
  38. // on resume pour eviter d'attendre un tour de boucle
  39. resume();
  40. // et on join
  41. pthread_join(m_hThread, NULL);
  42. }


 
En espérant que ca t'aide. (j 'ai juste fait du copier coller de code pour expliquer le principe, il faut bien sur tester les valeurs retournées des appels système )

mood
Publicité
Posté le 17-07-2012 à 20:43:41  profilanswer
 

n°2149728
theshockwa​ve
I work at a firm named Koslow
Posté le 18-07-2012 à 10:20:20  profilanswer
 

xilebo a écrit :

Ce que je fais en général, c'est une boucle de travail dans mon thread avec une variable de controle , et je suspend mon thread pendant un temps donné s'il n'a rien à faire.  
 
Pour le détruire, il suffit de changer la valeur de la variable de controle, de resumer le thread et de le join.


 
 :jap:


---------------
last.fm
n°2149730
in_your_ph​ion
Posté le 18-07-2012 à 11:03:09  profilanswer
 

xilebo a écrit :

En espérant que ca t'aide. (j 'ai juste fait du copier coller de code pour expliquer le principe, il faut bien sur tester les valeurs retournées des appels système )

 

salut,
merci beaucoup pour ton code :) je vois mieux là ^^

 

en fait j'avais essayé avec ça mais ça ne marchais pas terrible pour mon code, car j'avais mis un wait sans timeout, mais avec un timeout ça devrait plus le faire

 

ps : j'aurais encore une autre question, est ce que faire une boucle avec des try_lock() peut être vu comme un spin lock ?

 

merci

Message cité 1 fois
Message édité par in_your_phion le 18-07-2012 à 11:03:27
n°2149733
theshockwa​ve
I work at a firm named Koslow
Posté le 18-07-2012 à 11:09:35  profilanswer
 

in_your_phion a écrit :


 
salut,
merci beaucoup pour ton code :) je vois mieux là ^^  
 
en fait j'avais essayé avec ça mais ça ne marchais pas terrible pour mon code, car j'avais mis un wait sans timeout, mais avec un timeout ça devrait plus le faire
 
ps : j'aurais encore une autre question, est ce que faire une boucle avec des try_lock() peut être vu comme un spin lock ?
 
merci


 
Pas besoin de timeout, il faut juste signaler ta condition. Du point de vue du threads, tu dois traiter les deux cas, soit il y a effectivement quelque chose à consommer, soit il n'y a rien et c'est peut-être qu'il faut quitter (consulte le flag correspondant)


---------------
last.fm

Aller à :
Ajouter une réponse
  FORUM HardWare.fr
  Programmation
  C++

  Thread Pool

 

Sujets relatifs
Optimisation et multi-thread en Cécrire avec un thread en binaire ou en ascii ?
Boost::thread => perte de perfException in thread "AWT-EventQueue-0" java.lang.NoSuchMethodError
[java] Donner "avantage" à un thread[Python] Script crawler Multi-thread
pool de threadPool de thread
Pool de thread et scruter un repertoirethread pool
Plus de sujets relatifs à : Thread Pool


Copyright © 1997-2022 Hardware.fr SARL (Signaler un contenu illicite / Données personnelles) / Groupe LDLC / Shop HFR