суббота, 27 марта 2010 г.

Распределенные математические вычисления на языке Erlang

В статье иллюстрируется пример настройки окружения и серверов для возможности проведения распределенных вычислений на языке Erlang, а также их осуществление. В качестве примера предлагается перемножение матрицы на вектор с использованием пула pool(3) серверов Erlang. В данном случае пул состоит из 3 машин под управлением ОС FreeBSD.


Введение
Многие из вас, наверняка, уже использовали вызов spawn[_link] в своих приложениях, но немногие, как я заметил, знают о более лаконичном -- но также и более функциональном -- его аналоге pspawn[_link]. Попробуем заполнить этот пробел, решив простую математическую задачу.


Подготовка
Настройки серверов для хоста с двумя тюрьмами, расположенными на нем:
[root@conductor ~/crunching]# uname -prs
FreeBSD 7.1-RELEASE i386
[root@conductor ~/crunching]# grep '\.loc' /etc/hosts
192.168.32.40
192.168.32.201
192.168.32.202
[root@conductor ~/crunching]# jls
JID IP Address Hostname
1 192.168.32.201 j1.loc
2 192.168.32.202 j2.loc
[root@conductor ~/crunching]# ifconfig em0 | grep inet
inet 192.168.32.40 netmask 0xffffff00 broadcast 192.168.32.255
inet 192.168.32.201 netmask 0xffffff00 broadcast 192.168.32.201
inet 192.168.32.202 netmask 0xffffff00 broadcast 192.168.32.202
[root@conductor ~/crunching]# cat ~/.hosts.erlang
'j1.loc'
'j2.loc'
[root@conductor ~/crunching]# pkg_info -E erlang*
[root@conductor ~/crunching]#


Запуск мастер-ноды poolmaster@conductor Erlang на хосте, затем запуск пула ведомых нод (slaves) в тюрьмах. Возвращение списка адресов машин, составляющих пул:
[root@conductor ~/crunching]# erl -sname poolmaster -rsh ssh
Erlang R13B01 (erts-5.7.2) [source] [rq:1] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.2 (abort with ^G)
(poolmaster@conductor)1> c(pwnd).
{ok,pwnd}
(poolmaster@conductor)2> pool:start(crunchpool).
[crunchpool@j1,crunchpool@j2]
(poolmaster@conductor)3> pool:get_nodes().
[poolmaster@conductor,crunchpool@j1,crunchpool@j2]
(poolmaster@conductor)4>


Удаленное подключение к shell Erlang-ноды, находящейся на другой машине (в данном случае -- в тюрьме), и исполнение произвольного кода:
[root@conductor ~/crunching]# erl -sname client -remsh crunchpool@j1
Erlang R13B01 (erts-5.7.2) [source] [rq:1] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.2 (abort with ^G)
(crunchpool@j1)1> os:cmd("hostname").
"j1.loc\n"
(crunchpool@j1)2>


На всех узлах пула (в т. ч. на ведущем (master)) запускаются Erlang Port Mapper Daemon (epmd(1)) и beam(1):

[root@conductor ~] jexec 1 sockstat -4
USER COMMAND PID FD PROTO LOCAL ADDRESS FOREIGN ADDRESS
root beam 2863 7 tcp4 192.168.32.201:63037 *:*
root beam 2863 8 tcp4 192.168.32.201:50707 192.168.32.201:4369
root beam 2863 10 tcp4 192.168.32.201:54136 192.168.32.40:61473
root beam 2863 13 tcp4 192.168.32.201:63037 192.168.32.202:61991
root epmd 1143 3 tcp4 192.168.32.201:4369 *:*
root epmd 1143 4 tcp4 192.168.32.201:4369 192.168.32.201:50707
root inetd 861 5 tcp4 192.168.32.201:22 *:*
root sendmail 837 5 tcp4 192.168.32.201:25 *:*
root syslogd 777 6 udp4 192.168.32.201:514 *:*
[root@conductor ~] jexec 2 sockstat -4
USER COMMAND PID FD PROTO LOCAL ADDRESS FOREIGN ADDRESS
root beam 2880 7 tcp4 192.168.32.202:64315 *:*
root beam 2880 8 tcp4 192.168.32.202:57386 192.168.32.202:4369
root beam 2880 10 tcp4 192.168.32.202:50430 192.168.32.40:61473
root beam 2880 11 tcp4 192.168.32.202:61991 192.168.32.201:63037
root epmd 1160 3 tcp4 192.168.32.202:4369 *:*
root epmd 1160 4 tcp4 192.168.32.202:4369 192.168.32.202:57386
root inetd 1024 5 tcp4 192.168.32.202:22 *:*
root sendmail 1002 4 tcp4 192.168.32.202:25 *:*
root syslogd 942 6 udp4 192.168.32.202:514 *:*


Запуск процесса распределенного вычисления, оканчивающегося выдачей результатов. Получение списка присоединенных нод, остановка пула, выход из Erlang shell:
(poolmaster@conductor)13> pwnd:vStart[[1,4,-2,1]], [9,1,11,1], [3,-3,5,1], [1,1,1,1]], [1,5,1,-3]).
ResL = [{module,pwnd},{module,pwnd},{module,pwnd}]
PIDS = [<0.141.0>,<8877.51.0>,<8838.51.0>,<0.142.0>]
poolmaster@conductor: [<0.141.0>/1] V1=[1,4,-2,1] * V2=[1,5,1,-3] == [16,0,0,0]
poolmaster@conductor: [<0.142.0>/4] V1=[1,1,1,1] * V2=[1,5,1,-3] == [0,0,0,4]
crunchpool@j1: [<8838.51.0>/3] V1=[3,-3,5,1] * V2=[1,5,1,-3] == [0,0,-10,0]
crunchpool@j2: [<8877.51.0>/2] V1=[9,1,11,1] * V2=[1,5,1,-3] == [0,22,0,0]
[16,22,-10,4]
(poolmaster@conductor)14> nodes().
[crunchpool@j1,crunchpool@j2]
(poolmaster@conductor)15> pool:stop().
stopped
(poolmaster@conductor)16> nodes().
[]
(poolmaster@conductor)17> q().
ok
(poolmaster@conductor)18>
[root@conductor ~/crunching]#


В примере выше задан ключ -rsh ssh, сообщающий Erlang VM, что доступ к другим нодам должен осуществляться по протоколу SSH. В этой демонстрации доступ к узлам производится по SSH с применением аутентификации через публичный ключ. Можно также использовать другие методы аутентификации, например, Kerberos. Мастер-нода автоматически запускает Erlang VM удаленно на каждом из ведомых узлов пула. Т. о. налицо простота ввода новых мощностей в кластер. В идеале можно было бы свести к минимуму объем работы по конфигурированию окружения (ОС, VM etc.) и построить кластер на основе diskless узлов, загружающихся через LAN Boot, тем самым повысив надежность.

Исходя из последнего вывода на stdout смею предположить, что запуск (pspawn) процессов в пуле в отсутствие нагрузки происходит на разных серверах циклически (in a round-robin fashion), в противном случае учитывается загруженность каждого индивидуального вычислительного узла и производится балансировка нагрузки, т. е. запуск каждого нового процесса (именно Erlang процесса в Erlang VM, а не процесса ОС, aka "green process") на одном из наименее загруженных узлов пула.


Исходный код
-module(pwnd).

-export([startSay/0, say/2]).
-export([vSum/2, vMul/2, vVec/3, vStart/2, vSpawn/1,
vReduce/2, vScatter/5, vWorker/1]).

-import(lists).

vSum([], []) ->
[];
vSum([H1 | T1], [H2 | T2]) ->
[H1 + H2] ++ vSum(T1, T2).

vMul([], []) ->
[];
vMul([H1 | T1], [H2 | T2]) ->
[H1 * H2] ++ vMul(T1, T2).

vVec(N, I, EL)->
ZERO = lists:map(fun(X)->0*X end, lists:seq(1, N)),
lists:sublist(ZERO, I - 1) ++
[EL] ++ lists:sublist(ZERO, N - I).

vWorker(PID) ->
receive
{V1, V2, N} ->
P = vVec(length(V1),N,lists:sum(vMul(V1,V2))),
io:format("~w: [~w/~w] V1=~w * V2=~w == ~w~n",
[node(), self(), N, V1, V2, P]),
PID ! {vector, P}
end.

vScatter([], _V, [], 0, _TOT) ->
[];
vScatter([MH | MT], V, [PID | REST], N, TOT) ->
PID ! {MH, V, TOT - N + 1},
vScatter(MT, V, REST, N - 1, TOT).

vReduce(RES, 0) ->
RES;
vReduce(RES, N) ->
receive
{vector, V} ->
vReduce(vSum(RES, V), N - 1)
end.

vSpawn([]) ->
[];
vSpawn([_H | T]) ->
[pool:pspawn_link(
pwnd,
vWorker,
[self()]
)] ++ vSpawn(T).

vStart(M, V) ->
% Distribute code
{Mod, Bin, Filename} = code:get_object_code(?MODULE),
{ResL, []} = rpc:multicall(
code,
load_binary,
[Mod, Filename, Bin]
),
io:format("ResL = ~w~n", [ResL]),
ZERO = lists:map(fun(X)->0*X end,
lists:seq(1, length(V))
),
PIDS = vSpawn(lists:seq(1, length(V))),
io:format("PIDS = ~w~n", [PIDS]),
vScatter(M, V, PIDS, length(PIDS), length(PIDS)),
vReduce(ZERO, length(V)).

Заключение
Таким образом, отредактировав всего пару конфигурационных файлов и заменив spawn на pspawn, мы получили гибкое распределенное приложение, способное реагировать на изменения кода.
Для повышения общей производительности есть много вариантов. Например, компиляция BEAM модулей с нативным кодом (HiPE, изначально являлся проектом Upsala University). На тестовой non-SMP одноядерной платформе на базе Celeron 2400 MHz производительность перемножения векторов с количеством элементов ~108 при запуске HiPE модуля увеличилась на 30%. Есть возможность использования портов и драйверов (ports and linked-in drivers) для запуска приложений, написанных на С и других языках. Можно задействовать BLAS. Есть и другие варианты оптимизации.

Ниже, в коде, функция vStart/2 использует автоматический механизм передачи и загрузки модулей (двоичного кода) на узлы кластера. Поэтому отпадает необходимость при изменении программы распространять ее на каждую машину вручную. Кроме того в Erlang реализована возможность горячей замены кода (hotswap) без остановки выполнения приложения, при этом в памяти может существовать 2 версии модуля. Это позволяет существенно снизить или совсем исключить простои, которые являются необходимой мерой при разработке приложений, например, на С, Java и т. д. (подробнее). Некоторые системы, написанные разными разработчиками на Erlang, работают годами и регулярно обновляются.

В 2005 году был представлен проект ERESYE, позволяющий программировать Artificial Intelligence (искусственную разумность:) и экспертные системы в Erlang. Есть пример использования этой технологии.

Т. о. можно наблюдать преимущества функционального программирования в среде Erlang/OTP, в котором за долгую историю языка воплощено и продолжает активно внедряться множество продвинутых технологий, которым даже подражают другие языки, например, Scala (Erlang vs. Scala) и т. д.


Trivia
При загрузке кода в ваш редактор отступы могут сбиться. Для vim в командном режиме перейдите в начало файла (gg) и наберите на клавиатуре команду =G
Это откорректирует отступы.


Источники

3 комментария:

  1. Спасибо, было любопытно почитать. Хороший старт для изучения программирования распределённых приложений.

    ОтветитьУдалить
  2. Этот комментарий был удален автором.

    ОтветитьУдалить
  3. 1. На экспорт объявлено: -export([startSay/0, say/2]). Ф-ции в коде не наблюдаются.

    2. Если пользуете -import, то формат следующий:
    -import(Module, [Function1/Arity, ..., FunctionN/Arity]).

    3. Какой смысл работать с pool на ОДНОЙ_И_ТОЙ_ЖЕ машине? Если же есть желание поработать с пулом, то попробуйте настроить хотя бы НЕСКОЛЬКО (физически) хостов! В таком случе, необходимо соблюсти несколько начальных условий:
    а) .hosts.erlang
    b) .erlang.cookie
    c) no passwd access over ssh (from main pool node to all of slaves) (если для erl указан ключь -rsh ssh)

    4. В коде использовано хорошее решение с ... code:get_object_code(?MODULE) ... - нет нужды копировать итоговый beam.

    5. {ResL, []} = rpc:multicall ... Шняга со вторым параметром BadNodes . Если нет желания к анализу "не положительного" раз-та rpc вызова и/или 2й параметр по-любому не интересен, то логичнее записать так { ResL, _ } = ...

    ЗЫ: Пиши еще про Erlang!

    ОтветитьУдалить