© Дамир Тенишев, 2007-09-02 - 2007-09-16. Версия 1.1. Назначение: Сопровождающий документ курса ЛОиПО АС. Уровень подготовки: Практикующий программист.
Совместный труд на мою пользу объединяет!
Кот Матроскин

Статья рассказывает о том, как с минимальными усилиями задействовать двухядерную или двухпроцессорную архитектуру в вашем приложении.

Последнее время тактовая частота процессоров приблизилась к пределу, для преодоления которого требуется разработка принципиально новых технологий изготовления микросхем. Дальнейший рост тактовой частоты процессоров будет незначительным.

Производительность персональных компьютеров будет достигаться числом процессоров или ядер в одном кристалле. При этом в целях снижения энергопотребления частота таких многоядерных процессоров будет даже ниже частоты среднего одноядерного процессора.

В такой ситуации очень интересной является тема использования вычислительной мощности многоядерных (многопроцессорных) систем.

Данная статья знакомит читателя с принципами использования многопоточности в операционной системе Windows на примере переработки однопоточного приложения в многопоточное.

Цель использования многопоточности – получить программу, которая будет работать быстрее на многопроцессорных системах.

Пример

В качестве примера выбран фрагмент реальной системы, выполняющий чтение файла и частотный анализ строк. Данный пример считаю очень показательным, поскольку чтение данных с диска и их предварительная обработка присутствует в большинстве систем и при больших объёмах данных может выполняться достаточно долго. Потери времени ведут к удорожанию процессов, раздражению пользователей и порой даже изменению производственных процессов.

Для чистоты эксперимента мной был создан новый проект, в который скопирован код, относящийся к рассматриваемому вопросу.

Рассмотрим оригинальный код однопоточного приложения:

stater.h

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include "stdafx.h"
#include 
#include 
#include 
#include “timer.h”

using namespace std;

void RemoveLeadingBlanks(string& s) {
  string::size_type pos = s.find_first_not_of(" \t");
  if (string::npos == pos) { s=""; return; }
  if (           0 == pos) return;

  s.replace(0,pos,"");
}

int _tmain(int argc, _TCHAR* argv[])
{
  ifstream in("all.dat");

  if (!in) {
    cerr << "Can't open all.dat" << endl;
    return 1;
  }

  map lines_base;
  std::string line;

  Timer timer;

  while (!in.eof()) {
    std::getline(in,line);
    RemoveLeadingBlanks(line);
    if (line=="") continue;
    ++lines_base[line];
  }

  cout << "Spend time " << timer.getTime() << "sec."<< endl;

  return 0;
}

Функция RemoveLeadingBlanks служит для удаления в начале строки символов табуляции и пробелов.

В строке 20 определён ассоциативный массив, в котором для каждой строки (текст) хранится число её обнаруженных вхождений.

В цикле (строки 23-28) производится чтение строки из файла и увеличение счётчика обнаруженных вхождений (если такая строка уже была считана).

Для замера времени работы программы используется класс Timer:

timer.h

 1
 2
 3
 4
 5
 6
 7
class Timer {
  unsigned long  start_time;
public:
  Timer();

  float getTime() const;
};

timer.cpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#include "stdafx.h"
#include "timer.h"
#include 
#include 

Timer::Timer() {
  start_time = timeGetTime();
}

float Timer::getTime() const {
  return (timeGetTime()-start_time)/1000.0f;
}

На моей конфигурации (Intel PentiumD 3.4 GHz) исходный файл размером в миллион строк обрабатывается этой программой 3.515 секунды. При этом Task Manager показывает в графе “CPU” 50%-ую загрузку процессора. То есть работает только одно ядро, что логично для данной программы.

Если вы используете VS.NET 2005 для данной программы и для всех последующих версий необходимо переключить настройки генерации кода. В свойствах проекта опция Comfigaration Properties / C/C++ / Code Generation / Runtime Library должна быть установлена в Multi-threaded (/MT) для конфигурации Release и в Multi-threaded Debug (/MTd) для конфигурации Debug. По умолчанию проект создаётся с настройкой Multi-threaded DLL, которая необходима только при разработке приложений использованием динамически линкуемых библиотек. При её использовании время выполнения приложения увеличивается на 36% (для данного приложения).

Шаг 1. Анализ

В первую очередь необходимо провести анализ кода для того, чтобы понять, как данная программа может быть адаптирована для работы в многопроцессорной системе. То есть понять, как её следует разделить на два потока.

Очевидно, что основная работа программы сосредоточена в следующих строках кода:

Фрагмент stater.cpp

24
25
26
27
std::getline(in,line);
RemoveLeadingBlanks(line);
//…
++lines_base[line];

Почему очевидно? Эти строки выполняются в цикле для обработки всего входного файла. Чтение из файла, удаление лидирующих пробелов и добавление в базу – основные операции, требующие времени.

Те, кто не имеет достаточного опыта в определении участков кода, потребляющих процессорное время, могут воспользоваться программами-профайлерами, которые показывают число вызов каждой инструкции и суммарное время, затраченное на их выполнение. Например, Compuware DevPartner или Intel VTune.

Анализ данного участка кода показывает, что для файла в миллион строк примерно по 20% времени выполнения занимают строки 24 и 25 и 40% занимает строка 27. Замечание о размере обрабатываемого файла важно, поскольку время выполнения строк 24 и 25 константно и не меняется входе выполнения программы. Время выполнения строки 27 зависит от числа уже помещённых в ассоциативный массив строк. С увеличением их числа в массиве время добавления нового элемента увеличивается. Эта особенность рассматриваемого алгоритма ещё сыграет свою роль при анализе методов синхронизации потоков выполнения.

При решении данной задачи будем полагать, что целевая конфигурация имеет два процессора (или два ядра), что справедливо для большинства современных персональных компьютеров. Для равномерной загрузки процессоров необходимо разделить код на две равные по вычислительной нагрузке части.

Предлагаю разнести в два отдельных потока чтение строки из файла с её предварительной обработкой и добавление строки в ассоциированный массив. Каждый из потоков будет заниматься своей работой. Один – читать из файла и убирать лишние пробелы, второй – добавлять данные в ассоциированный массив.

Шаг 2. Создание потока

Для того чтобы задействовать несколько процессоров, необходимо создать независимые потоки управления, каждый из которых сможет выполняться на отдельном процессоре.

Для создания потока воспользуемся функцией WinAPI:

::_beginthread(MapInserter,0,NULL)

Описание параметров можно найти в MSDN. В данном случае мы указываем, что создаётся отдельный поток управления, который начнёт функционирование в функции MapInserter. То есть эта функция будет выполняться независимо и параллельно с основной веткой программы в main.

Для того, чтобы код с вызовом этой функции скомпилировался, необходимо подключить заголовочный файл process.h.

Если функция MapInserter завершит выполнение, то поток завершит выполнение вместе с ней. Поэтому данная функция должна быть построена так, чтобы циклически выполнять заданное действие.

Созданный поток будет автоматически уничтожен при выходе из функции MapInserter, поэтому дополнительно никакие функции (например, ::_endthread) вызывать не надо.

Более того - _endthread является «аварийным» способом завершения потока и им не следует пользоваться без необходимости. Хотя бы потому, что при этом поток будет прерван в текущей точке выполнения и, как минимум, не будут вызываны деструкторы локальных объектов выполняемой функции. Также могут возникнуть проблемы с целостностью используемых ресурсов (например, часть важной операции не будет выполнена до конца).

Шаг 3. Наивное решение

Теперь у нас есть два независимых потока и нам необходимо передавать между ними данные. Как передать данные из функции main в функцию MapInserter?

Первым решением, не учитывающим специфику работы независимых потоков, использующих одни и те же данные, будет следующее решение:

Фрагмент MapInserter.cpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include "stdafx.h"
#include 
#include 
#include "MapInserter.h"

using namespace std;

map lines_base;
static volatile bool stop_map_inserter=true;
static string line_to_add;

void MapInserter( void ) {
  stop_map_inserter = false;
  while(!stop_map_inserter) {
    ++lines_base[line_to_add];
  }
}

void AddToDataBase(const string& line) {
  line_to_add = line;
}

void StopMapInserterThread() {
  stop_map_inserter = true;
}

Функция AddToDataBase является интерфейсом данного потока к внешнему миру. В то время как функция MapInserter выполняется постоянно (и всегда готова добавлять элементы в массив) функция AddToDataBase передаёт ей строки, необходимые для добавления через глобальную переменную line_to_add.

Функция AddToDataBase будет вызываться функцией-клиентом, желающим поместить данные в массив. В нашем случае это будет функция main. Поскольку AddToDataBase будет вызываться из функции main, она будет работать в потоке функции main (будем называть его первым потоком).

Фрагмент stater.cpp

24
25
26
27
std::getline(in,line);
RemoveLeadingBlanks(line);
//…
AddToDataBase(line);

Когда клиентский код из функции main захочет добавить строку в массив, он вызовет AddToDataBase, передав ей строку в качестве аргумента. Фукнция AddToDataBase в строке 16 поместит строку в глобальную переменную line_to_add.

Постоянно выполняющийся цикл во втором потоке (строки 11-13) добавит необходимую информацию в массив.

Переменная stop_map_inserter играет роль команды «остановить поток». Объявление её как volatile указывает компилятору, что переменная может изменяться и использоваться в разных потоках и поэтому её нельзя кэшировать или подвергать код с её участием оптимизации. Без такого указания цикл в строках 11-13 был бы «вечным циклом», поскольку условие в строке 11 считалось бы неизменным. Функция StopMapInsertedThread устанавливает значение true этой переменной, что приводит к невыполнению условия в строке 11 и завершению работы потока.

Однако, данный код ошибочен. И основная ошибка в нём – отсутствие синхронизации потоков. Оба потока работают независимо друг от друга и второму потоку ничего не известно о том, когда первый поток поместит в глобальную переменную новую строку. А первому потоку ничего не известно о том, когда второй поток завершит добавление строки и можно будет ему передавать новую строку.

После запуска эта программа может в лучшем случае без сбоя выполнить несколько итераций, после чего, скорее всего, выполнит недопустимую операцию и завершится аварийно. Причиной аварийного завершения будет попытка одновременного доступа к глобальной переменной line_to_add из двух потоков одновременно, когда первый поток будет записывать новую строку, а второй продолжать добавление старой строки.

Шаг 4. Синхронизация подручными средствами

Нам необходимо решить задачу синхронизации потоков:

Для решения данной задачи введём глобальный флаг, определяющий готовность строки к добавлению в массив.

Фрагмент MapInserter.cpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include "stdafx.h"
#include 
#include 
#include "MapInserter.h"

using namespace std;

map lines_base;
static string        line_to_add;
static volatile bool line_ready_to_add=false;
static volatile bool stop_map_inserter=true;

void MapInserter( void* ) {
  stop_map_inserter=false;
  while (!stop_map_inserter) {
    if (line_ready_to_add) {
      string line;
      line=line_to_add;
      line_ready_to_add = false;
      ++lines_base[line];
    }
  }
}

void AddToDataBase(string& line) {
  while (line_ready_to_add)
    ;
  line_to_add=line;
  line_ready_to_add = true;
}

void StopMapInserterThread() {
  stop_map_inserter = true;
}

Флаг line_ready_to_add задан с ключевым словом volatile с тем, чтобы пояснить компилятору, что данная переменная используется совместно двумя потоками и её значение может быть изменено «извне» текущего потока управления. Без такого указания конструкция в строках 12-19 была бы «вечным циклом» при значении line_ready_to_add равном true или «пустым кодом» при значении false.

Начальное значение флага line_ready_to_add равно false, что означает отсутствие строки, готовой к помещению в массив.

Второй поток после создания выполняет «вечный цикл» в строках 12-19. Завершение этого цикла означало бы выход из функции потока и завершение выполнения данного потока. В цикле проверяется значение флага line_ready_to_add и, поскольку он имеет значение false, добавление строки не производится.

При вызове фукнции AddToDataBase проверяется значение флага и, поскольку он изначально false, то управление сразу передаётся в сторку 24, где строка-аргумент копируется в глобальную переменную и флагу line_ready_to_add присваивается значение true.

Выполняющаяся параллельно MapInserter на очередной итерации цикла выясняет, что значение флага line_ready_to_add стало истинно и выполняет строки 14-17, в которых создаётся локальная копия строки и эта копия добавляется в массив.

Создание локальной копии строки – длительная операция и в это время клиент может, не дождавшись завершения добавления строки, вновь вызывать AddToDataBase. В этом случае сработает проверка в строках 22-23. Флаг line_ready_to_add будет сохранять значение true всё время, пока выполняется строка 15. Таким образом, первый поток будет ожидать завершения этой операции. И только когда выполнится строка 16, флаг AddToDataBase вновь станет false и функция AddToDataBase продолжит выполение со строки 24.

После того, как в 15 строке будет разрешено добавление новой строки, в строке 17 начнётся длительная операция добавления локальной копии строки в ассоциативный массив.

Время выполнения программы – 2.796 секунды, то есть в 1.25 раза быстрее, чем у оригинального кода. При этом TaskManager показывает 98%-ную загрузку процессора данным приложением. Что же явилось причиной такого малого прироста производительности?

Рассмотрим типичный сценарий работы этого кода.

Предположим, что вставка элемента занимает намного меньше времени, чем его чтение из файла и удаление пробелов. В этом случае функция MapInserter проводит большую часть времени в строках 12-13 в ожидании, когда же поступит новая строка для обработки. При этом поток занимает весь процессорный ресурс, циклически проверяя состояние флага.

В обратном случае, когда добавление нового элемента занимает намного больше времени, чем его чтение из файла и удаление пробелов, AddToDataBase проводит большую часть времени в строках 22-23, ожидая готовности MapInserter’a.

Учитывая, что время выполенния операции добавления строки в ассоциированный массив непостоянно, программа периодически попадает либо в первую, либо во вторую ситуацию.

Также в коде остаётся трудноуловимая ошибка: реально клиентский код первого потока, создавший второй поток ничего не знает о том, добавлена ли строка. Он только передаёт эту строку на добавление. При этом он принимает решение о закрытии потока. Необходимо, чтобы первый поток дожидался завершения работы второго потока перед тем, как его уничтожить.

То есть, возможна ситуация, когда клиентский код вызовет функцию AddToDataBase и сразе же после неё – StopMapInserterThread. Если предположить, что код второго потока в это время находится ещё в строке 17 (что вполне разумно, поскольку это достаточно длительная операция), то к моменту вызова StopMapInserterThread получится, что последняя переданная клиентом строка ещё находится в переменной line_to_add в ожидании помещения в массив. Но установка в true переменной stop_map_inserter приведёт к выходу из цикла в строке 12 до того, как line_to_add будет добавлена в массив.

Для этого код функции StopMapInserterThread, придётся изменить для того, чтобы второй поток успел записать последнюю строку в массив.

Фрагмент MapInserter.cpp

27
28
29
30
31
void StopMapInserterThread() {
  while (line_ready_to_add)
    ;
  stop_map_inserter = true;
}

Клиентский код будет выглядеть следующим образом:

Фрагмент stater.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include "stdafx.h"
#include 
#include 
#include 
#include 
#include 

#include "timer.h"
#include "MapInserter.h"

using namespace std;

void RemoveLeadingBlanks(string& s)
{
  string::size_type pos = s.find_first_not_of(" \t");
  if (string::npos == pos) { s=""; return; }
  if (           0 == pos) return;

  s.replace(0,pos,"");
}

 
int _tmain(int argc, _TCHAR* argv[])
{
  ifstream in("all.dat");

  if (!in) {
    cerr << "Can't open all.txt" << endl;
    return 1;
  }

  std::string line;

  Timer timer;

  uintptr_t map_inserter_thread = ::_beginthread(MapInserter,0,NULL);

  while (!in.eof()) {
    std::getline(in,line);
    RemoveLeadingBlanks(line);
    if (line=="") continue;
   

    AddToDataBase(line);
  }

  cout << "Spend time " << timer.getTime() << "sec."<< endl;

  StopMapInserterThread();

  return 0;
}

Шаг 5. Синхронизация средствами операционной системы

Основной недостаток синхронизации с помощью флагов заключается в том, что операционная система не получает информации о том, что поток находится в ожидании и он продолжает занимать вычислительные ресурсы, циклически проверяя значение флага. Таким образом, даже при таком простое процессор загружен полностью. В случае если в вашем приложении больше потоков, чем процессоров на целевом компьютере или если паралельно с вашим приложением выполняются другие приложения, такая нагрузка на процессор недопустима. Необходимо сообщить операционной системе, что вычислительный ресурс свободен и может использоваться другим потоком. Для этого существуют специальные средства сихнронизации потоков, предоставляемые операционной системой. Они не только обеспечивают синхронизацию, но также дают возможность операционной системе получать необходимую информацию о простаивающих в ожидании процессорах и использовать их для исполнения других потоков.

Операционная система предоставляет ряд механизмов синхронизации, из которых для нашего приложения самым близким будет механизм сообщений (events).

Основные изменения затронут модуль MapInserter.cpp

Фрагмент MapInserter.cpp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include "stdafx.h"
#include 
#include 
#include "MapInserter.h"

using namespace std;

map lines_base;
string line_to_add;

HANDLE  hEvDataReady=NULL;
HANDLE  hEvInserterReady=NULL;

static volatile bool stop_map_inserter=true;

void MapInserter( void *){
  stop_map_inserter = false;
  ::WaitForSingleObject(hEvDataReady,INFINITE);
  while ( !stop_map_inserter ) {
    ++lines_base[line_to_add];
    ::SetEvent(hEvInserterReady);
    ::WaitForSingleObject(hEvDataReady,INFINITE);
  }
}

void AddToDataBase(const string& line) {
  ::WaitForSingleObject(hEvInserterReady,INFINITE);
  line_to_add = line;
  ::SetEvent(hEvDataReady);
}

void StopMapInserterThread() {
  ::WaitForSingleObject(hEvInserterReady,INFINITE);
  stop_map_inserter = true;
  // Придётся дать фиктивный сигнал
  ::SetEvent(hEvDataReady); 
}

В строках 8-9 объявлены дескрипторы событий, которые будут использоваться для информирования о готовности строки к помещению в массив и о готовности потока к помещению в массив следущей строки. Cоответствующие события создаются в модуле stater.cpp (см. ниже).

В строке 13 начинается ожидание сигнала «строка готова». До получения этого сигнала управление переходит операционной системе, и вычислительный ресурс не используется. Когда будет получен соответствующий сигнал, функция произведёт добавление строки в массив и по готовности активирует сигнал «поток готов к вставке строки» и снова перейдёт к ожиданию сигнала «строка готова».

Функция AddToDataBase начинается с ожидания сигнала «поток готов к вставке строки», получив который (а изначально он есть), функция копирует аргумент в глобальную переменную и активирует сигнал «строка готова».

Функция StopMapInserterThread ожидает сигнала «поток готов к вставке строки», который сигнализирует о том, что строка добавлена в массив, и поток можно остановить. После получения сигнала функция выставляет флаг остановки потока (строка 27) и дальше приходится дать фиктивный сигнал «данные готовы» (строка 29). Реально никаких данных нет, но нам требуется, чтобы усправление в строке 17 было передано потоку, и выполнилась проверка условия в строке 14. То есть для того, чтобы поток завершил выполнение, ему необходимо вернуть управление.

В модуле stater.cpp создаются и уничтожаются флаги:

Фрагмент stater.cpp

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  std::string line;
  Timer timer;

  hEvDataReady     = ::CreateEvent(NULL,FALSE,FALSE,NULL);
  hEvInserterReady = ::CreateEvent(NULL,FALSE,TRUE ,NULL);

  uintptr_t map_inserter_thread = ::_beginthread(MapInserter,0,NULL);

  while (!in.eof()) {
    std::getline(in,line);
    RemoveLeadingBlanks(line);
    if (line=="") continue;

    AddToDataBase(line);
  }

  cout << "Spend time " << timer.getTime() << "sec."<< endl;

  StopMapInserterThread();

  ::CloseHandle(hEvInserterReady);
  ::CloseHandle(hEvDataReady);

Описание параметров функции CreateEvent можно найти в MSDN. Важное отличие лишь в том, что событие hEvInserterReady создаётся в состоянии «сигнал есть», в то время как событие hEvDataReady создаётся в состоянии «сигнала нет». То есть начальное состояние – «поток готов к вставке строки» и «строка не готова».

Обратите внимание, что флаги должны создваться до создания потока и удаляться после остановки потока. То есть они должны существовать всё время выполнения потока.

Здесь обнаруживается ещё одна трудноуловимая ошибка: функция StopMapInserterThread в её текущем исполнении возвращает управление клиенту, не дождавшись завершения работы потока MapInserter. В данной программе это не приводит к проблемам. Однако в общем случае это также может привести к ошибкам и необходимо обеспечить необходимые проверки. При этом проверки необходимо обеспечить в любом случае, даже если это в данный момент алгоритм безопасен с этой точки зрения. Позже алгоритм может измениться, а проверку вы скорее всего забудете добавить.

После запуска программы нас ждут две новости: плохая и хорошая. Хорошая новость состоит в том, что управление, действительно, передаётся другим потокам и приложение занимает теперь около 40 % ресурсов процессора. Плохая новость состоит в том, что приложение теперь выполняется 24 секунды! Что же явилось причиной такого увеличения времени работы программы?

К сожалению, при всей полезности системы сообщений (events) и ожидания событий (WaitForSingleObject) их использование не бесплатно. Накладные расходы на использование одной пары Event/Wait составляют 15 микросекунд (на моей конфигурации), что в пять раз превышает среднее время выполнения операции добавления элемента в массив. В результате накладные расходы на обеспечение синхронизации потоков превышают полезную работу.

Шаг 5. Буферизация.

Мы имеем две проблемы:

  1. Время, затрачиваемое на операцию синхронизации потоков, велико по сравнению со временем выполнения одной операции
  2. Время выполнения операции добавления элемента в массив непостоянно, растёт с увеличением числа прочитанных строк и при добавлении конкретной строки зависит от текущего состояния массива.

Решить обе указанные проблемы можно, используя буферизацию передаваемой между потоками информации. Первый поток считывает строки и передаёт их второму потоку. При этом они сохраняются в промежуточном буфере.

Для того чтобы запись в буфер в первом потоке и чтение из буфера во втором потоке могли происходить одновременно, реально создаётся два буфера. В один производится запись новых строк, а из второго производится чтение строк и помещение их в массив.

Когда первый буфер заполняется, запись переключается на второй буфер, а первый используется вторым потоком для чтения.

Исходный код модуля stater.cpp не изменится. Буферизация должна быть реализована прозрачно для клиентского модуля.

Рассмотрим изменения в MapInserter.cpp

Фрагмент MapInserter.cpp

 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const int BufferSize=3000;
static string buffer[2][BufferSize];
static int    write_pos[2]={0,0};
static volatile int write_buffer=0;

HANDLE  hEvDataReady=NULL;
HANDLE  hEvInserterReady=NULL;

void MapInserter( void* ) {
  stop_map_inserter=false;
  ::WaitForSingleObject(hEvDataReady,INFINITE);
  while ( !stop_map_inserter ) {
    int read_buffer=(write_buffer+1)%2;

    const size_t size = write_pos[read_buffer];
    for (size_t i=0; i < size; ++i) {
      ++lines_base[buffer[read_buffer][i]];
    }

    write_pos[read_buffer]=0;

    ::SetEvent(hEvInserterReady);
    ::WaitForSingleObject(hEvDataReady,INFINITE);
  }
}

void AddToDataBase(string& line) {
  buffer[write_buffer][write_pos[write_buffer]++].swap(line);
  if (write_pos[write_buffer] >= BufferSize) {
    ::WaitForSingleObject(hEvInserterReady,INFINITE);
    write_buffer=(write_buffer+1)%2;
    ::SetEvent(hEvDataReady);
  }
}

void StopMapInserterThread() {
  ::WaitForSingleObject(hEvInserterReady,INFINITE);
  write_buffer=(write_buffer+1)%2;
  ::SetEvent(hEvDataReady);
  ::WaitForSingleObject(hEvInserterReady,INFINITE);

  stop_map_inserter = true;
  // Придётся дать фиктивный сигнал
  ::SetEvent(hEvDataReady); 
}

В строке 8 определен двумерный массив (два массива по BufferSize элементов). Он будет использоваться для записи и чтения данных при передаче между потоками. Два индекса write_pos используется для хранения текущей позиции записи в массиве. Переменная write_buffer служит для хранения индекса текущего буфера, используемого для записи.

Алгоритм синхронизации потоков – прежний. Различие только в том, что передаётся не один элемент, а буфер.

Функция StopMapInserterThread должна переключить незаполненный массив и дождаться, когда он будет полностью обработан. Это необходимо, чтобы гарантировать, что последние данные, переданные на обработку, не будут потеряны.

Время выполнения программы – 2.14 секунды, то есть в 1.64 раза быстрее, чем у оригинального кода. При этом TaskManager показывает в среднем 93%-ную загрузку процессора. Требуемый результат достигнут. Можно ли добиться двукратного ускорения работы программы? Скорее всего – нет. Поскольку для этого потребуется обеспечить абсолютно одинаковую нагрузку в обоих потоках, что сделать затруднительно. Полученный прирост производительности – очень хороший результат для двухядерного процессора при минимальных усилиях.

Заключение

В статье рассмотрен способ преобразования однопоточного приложения в многопоточное.

Кое-что осталось за пределами рассмотрения в данной статье. В частности – эффективность использования STL в задаче, где основной целью является производительность вычислений.

Переписывание файлового ввода этой программы с потоков с стиле C++ (ifstream) к потокам в стиле C (FILE*) позволяет увеличить скорость выполнения однопоточного варианта программы в 1.34 раза (время выполнения – 2.625 секунды). Но многопоточный вариант этой программы уже незначительно улучшает однопоточную версию – в 1.175 раза (время выполнения – 2.234 секунды). Причина этого в том, что потоки становятся неравноценными – чтение из файла значительно быстрее, чем добавление в массив. Это подтверждает и загрузка процессора – около 70%. То есть процесс чтения данных большую часть времени простаивает в ожидании готовности потока добавления в массив.

Но цель данной статьи – показать способ создания многопоточного приложения, а не оптимизация производительности программы. Это тема для отдельной беседы. Безусловно, производительность этой программы можно поднять ещё как минимум в три раза.

Со всеми замечаниями, дополнениями, вопросами пишите на почтовый ящик, адрес которого можно найти на сайте tenisheff.ru.

Дата последнего обновления 6.11.2015