ec_plists

plists is a drop-in replacement for module lists, making most list operations parallel.

plists is a drop-in replacement for module lists, making most list operations parallel. It can operate on each element in parallel, for IO-bound operations, on sublists in parallel, for taking advantage of multi-core machines with CPU-bound operations, and across erlang nodes, for parallizing inside a cluster. It handles errors and node failures. It can be configured, tuned, and tweaked to get optimal performance while minimizing overhead.

Almost all the functions are identical to equivalent functions in lists, returning exactly the same result, and having both a form with an identical syntax that operates on each element in parallel and a form which takes an optional "malt", a specification for how to parallize the operation.

fold is the one exception, parallel fold is different from linear fold. This module also include a simple mapreduce implementation, and the function runmany. All the other functions are implemented with runmany, which is as a generalization of parallel list operations.

Malts =====

A malt specifies how to break a list into sublists, and can optionally specify a timeout, which nodes to run on, and how many processes to start per node.

Malt = MaltComponent | [MaltComponent] MaltComponent = SubListSize::integer() | {processes, integer()} | {processes, schedulers} | {timeout, Milliseconds::integer()} | {nodes, [NodeSpec]}

NodeSpec = Node::atom() | {Node::atom(), NumProcesses::integer()} | {Node::atom(), schedulers}

An integer can be given to specify the exact size for sublists. 1 is a good choice for IO-bound operations and when the operation on each list element is expensive. Larger numbers minimize overhead and are faster for cheap operations.

If the integer is omitted, and you have specified a {processes, X}`, the list is split into X sublists. This is only useful when the time to process each element is close to identical and you know exactly how many lines of execution are available to you. If neither of the above applies, the sublist size defaults to 1. You can use `{processes, X}` to have the list processed by `X` processes on the local machine. A good choice for `X` is the number of lines of execution (cores) the machine provides. This can be done automatically with {processes, schedulers}, which sets the number of processes to the number of schedulers in the erlang virtual machine (probably equal to the number of cores). `{timeout, Milliseconds}` specifies a timeout. This is a timeout for the entire operation, both operating on the sublists and combining the results. exit(timeout) is evaluated if the timeout is exceeded. `{nodes, NodeList}` specifies that the operation should be done across nodes. Every element of NodeList is of the form `{NodeName, NumProcesses}` or NodeName, which means the same as `{NodeName, 1}`. plists runs NumProcesses processes on NodeName concurrently. A good choice for NumProcesses is the number of lines of execution (cores) a node provides plus one. This ensures the node is completely busy even when fetching a new sublist. This can be done automatically with `{NodeName, schedulers}`, in which case plists uses a cached value if it has one, and otherwise finds the number of schedulers in the remote node and adds one. This will ensure at least one busy process per core (assuming the node has a scheduler for each core). plists is able to recover if a node goes down. If all nodes go down, exit(allnodescrashed) is evaluated. Any of the above may be used as a malt, or may be combined into a list. `{nodes, NodeList}` and {processes, X} may not be combined. Examples ======== %%start a process for each element (1-element sublists)< 1 %% start a process for each ten elements (10-element sublists) 10 %% split the list into two sublists and process in two processes {processes, 2} %% split the list into X sublists and process in X processes, %% where X is the number of cores in the machine {processes, schedulers} %% split the list into 10-element sublists and process in two processes [10, {processes, 2}] %% timeout after one second. Assumes that a process should be started %% for each element.<br/> {timeout, 1000} %% Runs 3 processes at a time on apple@desktop, and 2 on orange@laptop %% This is the best way to utilize all the CPU-power of a dual-core<br/> %% desktop and a single-core laptop. Assumes that the list should be<br/> %% split into 1-element sublists.<br/> {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]} %% Like above, but makes plists figure out how many processes to use. {nodes, [{apple@desktop, schedulers}, {orange@laptop, schedulers}]} %% Gives apple and orange three seconds to process the list as<br/> %% 100-element sublists.<br/> [100, {timeout, 3000}, {nodes, [{apple@desktop, 3}, {orange@laptop, 2}]}] Aside: Why Malt? ================ I needed a word for this concept, so maybe my subconsciousness gave me one by making me misspell multiply. Maybe it is an acronym for Malt is A List Tearing Specification. Maybe it is a beer metaphor, suggesting that code only runs in parallel if bribed with spirits. Its jargon, learn it or you can't be part of the in-group.

Messages and Errors ===================

plists assures that no extraneous messages are left in or will later enter the message queue. This is guaranteed even in the event of an error.

Errors in spawned processes are caught and propagated to the calling process. If you invoke

plists:map(fun (X) -> 1/X end, [1, 2, 3, 0]).

you get a badarith error, exactly like when you use lists:map.

plists uses monitors to watch the processes it spawns. It is not a good idea to invoke plists when you are already monitoring processes. If one of them does a non-normal exit, plists receives the 'DOWN' message believing it to be from one of its own processes. The error propagation system goes into effect, which results in the error occuring in the calling process.

DATA TYPES

el_fun() = (term()) -> term()
fuse() = fuse_fun() | {recursive, fuse_fun()} | {reverse, fuse_fun()}
fuse_fun() = (term(), term()) -> term()
malt() = malt_component() | [malt_component()]
malt_component() = integer() | {processes, integer()} | {processes, schedulers} | {timeout, Milliseconds::integer()} | {nodes, [node_spec()]}
node_spec() = atom() | {Node::atom(), NumProcesses::integer()} | {Node::atom(), schedulers}

Functions


all(Fun::el_fun(), List::list()) -> boolean()

Same semantics as in module lists.

all(Fun::el_fun(), List::list(), Malt::malt()) -> boolean()

Same semantics as in module lists.

any(Fun::function(), List::list()) -> boolean()

Same semantics as in module lists.

any(Fun::function(), List::list(), Malt::malt()) -> boolean()

Same semantics as in module lists.

filter(Fun::function(), List::list()) -> list()

Same semantics as in module lists.

filter(Fun::function(), List::list(), Malt::malt()) -> list()

Same semantics as in module lists.

fold(Fun::function(), InitAcc::term(), List::list()) -> term()

Like below, but assumes 1 as the Malt. This function is almost useless, and is intended only to aid converting code from using lists to plists.

fold(Fun::function(), InitAcc::term(), List::list(), Malt::malt()) -> term()

Like below, but uses the Fun as the Fuse by default.

fold(Fun::function(), Fuse::fuse(), InitAcc::term(), List::list(), Malt::malt()) -> term()

fold is more complex when made parallel. There is no foldl and foldr, accumulators aren't passed in any defined order. The list is split into sublists which are folded together. Fun is identical to the function passed to lists:fold[lr], it takes (an element, and the accumulator) and returns -> a new accumulator. It is used for the initial stage of folding sublists. Fuse fuses together the results, it takes (Results1, Result2) and returns -> a new result. By default sublists are fused left to right, each result of a fuse being fed into the first element of the next fuse. The result of the last fuse is the result.

Fusing may also run in parallel using a recursive algorithm, by specifying the fuse as {recursive, Fuse}. See the discussion in runmany/4.

Malt is the malt for the initial folding of sublists, and for the possible recursive fuse.

foreach(Fun::function(), List::list()) -> ok

Similiar to foreach in module lists except it makes no guarantee about the order it processes list elements.

foreach(Fun::function(), List::list(), Malt::malt()) -> ok

Similiar to foreach in module lists except it makes no guarantee about the order it processes list elements.

map(Fun::function(), List::list()) -> list()

Same semantics as in module lists.

map(Fun::function(), List::list(), Malt::malt()) -> list()

Same semantics as in module lists.

ftmap(Fun::function(), List::list()) -> list()

values are returned as {value, term()}.

ftmap(Fun::function(), List::list(), Malt::malt()) -> list()

values are returned as {value, term()}.

partition(Fun::function(), List::list()) -> {list(), list()}

Same semantics as in module lists.

partition(Fun::function(), List::list(), Malt::malt()) -> {list(), list()}

Same semantics as in module lists.

sort(List::list()) -> list()

Same semantics as in module lists.

sort(Fun::function(), List::list()) -> list()

Same semantics as in module lists.

sort(Fun::function(), List::list(), Malt::malt()) -> list()

This version lets you specify your own malt for sort.

sort splits the list into sublists and sorts them, and it merges the sorted lists together. These are done in parallel. Each sublist is sorted in a seperate process, and each merging of results is done in a seperate process. Malt defaults to 100, causing the list to be split into 100-element sublists.

usort(List::list()) -> list()

Same semantics as in module lists.

usort(Fun::function(), List::list()) -> list()

Same semantics as in module lists.

usort(Fun::function(), List::list(), Malt::malt()) -> list()

This version lets you specify your own malt for usort.

usort splits the list into sublists and sorts them, and it merges the sorted lists together. These are done in parallel. Each sublist is sorted in a seperate process, and each merging of results is done in a seperate process. Malt defaults to 100, causing the list to be split into 100-element sublists.

usort removes duplicate elments while it sorts.

mapreduce(MapFunc, List::list()) -> dict()

  • MapFunc = (term()) -> DeepListOfKeyValuePairs
  • DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key::term(), Value::term()}

Like below, assumes default MapMalt of 1.

mapreduce(MapFunc, List, MapMalt) -> term()

mapreduce(MapFunc, List::list(), InitState::term(), ReduceFunc, MapMalt::malt()) -> dict()

  • MapFunc = (term()) -> DeepListOfKeyValuePairs
  • DeepListOfKeyValuePairs = [DeepListOfKeyValuePairs] | {Key::term(), Value::term()}
  • ReduceFunc = (OldState::term(), Key::term(), Value::term()) -> NewState::term()

This is a very basic mapreduce. You won't write a Google-rivaling search engine with it. It has no equivalent in lists. Each element in the list is run through the MapFunc, which produces either a {Key, Value} pair, or a lists of key value pairs, or a list of lists of key value pairs...etc. A reducer process runs in parallel with the mapping processes, collecting the key value pairs. It starts with a state given by InitState, and for each {Key, Value} pair that it receives it invokes ReduceFunc(OldState, Key, Value) to compute its new state. mapreduce returns the reducer's final state.

MapMalt is the malt for the mapping operation, with a default value of 1, meaning each element of the list is mapped by a seperate process.

mapreduce requires OTP R11B, or it may leave monitoring messages in the message queue.

runmany(Fun::function(), Fuse::fuse(), List::list()) -> term()

Like below, but assumes a Malt of 1, meaning each element of the list is processed by a seperate process.

runmany(Fun::([term()]) -> term(), Fuse::fuse(), List::list(), Malt::malt()) -> term()

All of the other functions are implemented with runmany. runmany takes a List, splits it into sublists, and starts processes to operate on each sublist, all done according to Malt. Each process passes its sublist into Fun and sends the result back.

The results are then fused together to get the final result. There are two ways this can operate, lineraly and recursively. If Fuse is a function, a fuse is done linearly left-to-right on the sublists, the results of processing the first and second sublists being passed to Fuse, then the result of the first fuse and processing the third sublits, and so on. If Fuse is {reverse, FuseFunc}, then a fuse is done right-to-left, the results of processing the second-to-last and last sublists being passed to FuseFunc, then the results of processing the third-to-last sublist and the results of the first fuse, and and so forth. Both methods preserve the original order of the lists elements.

To do a recursive fuse, pass Fuse as {recursive, FuseFunc}. The recursive fuse makes no guarantee about the order the results of sublists, or the results of fuses are passed to FuseFunc. It continues fusing pairs of results until it is down to one.

Recursive fuse is down in parallel with processing the sublists, and a process is spawned to fuse each pair of results. It is a parallized algorithm. Linear fuse is done after all results of processing sublists have been collected, and can only run in a single process.

Even if you pass {recursive, FuseFunc}, a recursive fuse is only done if the malt contains {nodes, NodeList} or {processes, X}. If this is not the case, a linear fuse is done.

Stephen Marsh
View Functions