Erlang的RPC 模块代码分析
Erlang作为一个并发和集群性的语言和平台,其中RPC就必不可少。本文对Erlang自身携带 的RPC模块进行分析
RPC模块的功能
RPC模块的功能主要是为了帮助程序员完成本地节点、两个或多个节点之间的调用。RPC模块 将远程调用设定为一定的模式,这样能做到方法和进程的位置透明,简化开发工作。 在RPC 模块中主要的方法有下面几个:
- call 同步调用
- block_call 阻塞同步调用
- cast 广播调用
- abcast 异步广播调用
- sbcast 同步广播调用
当然还有其它很多方法,就不在这里面一一介绍了
RPC的代码分析
进程创建
RPC模块本身是一个gen_server会随着kernel模块启动,也就是说,在Erlang/OTP启动后我 们就免费获得了一个RPC进程。 RPC进程启动的时候,会在Erts中通过local注册一个名字 rex的进程,这样没有经过修改的Erlang/OTP都会有这个名字在它的名字列表上。
RPC调用逻辑
不管是同步调用还是广播调用,在RPC模块中的调用都是依赖gen_server的相关方和 erlang:send方法来完成。这样尽最大可能的重用代码,保证了整个OTP中对远程调用的表现 的一致性。 并且RPC模块不单单可以调用远程节点的方法或进程,也可以调用本地节点的方 法或进程,这样保证了整个RPC的系统位置透明性,并且RPC模块针对本地节点作了相关优化。
例如说call方法针对本地节点就采用了下面的方法:
1 2 3 4 5 |
local_call(M, F, A) when is_atom(M), is_atom(F), is_list(A) -> case catch apply(M, F, A) of {'EXIT',_} = V -> {badrpc, V}; Other -> Other end. |
call和block_call方法
这两个方法都是同步的调用,但是实现的细节非常不同,对rex进程的影响也是不同的。当 然使用两个方法在并发执行的情况下,得到的结果是完全不同的。 不管是call也好, block_call也好,都会在执行阶段暂时的将被调用者进程的console输出重定向到调用者进 程所在节点的group leader上。
call方法
在调用发起者一侧,RPC模块会立刻建立一个监控下的Erlang进程,并在该进程内通过 gen_server:call方法来调用远程节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
do_call(Node, Request, infinity) -> rpc_check(catch gen_server:call({?NAME,Node}, Request, infinity)); do_call(Node, Request, Timeout) -> Tag = make_ref(), {Receiver,Mref} = erlang:spawn_monitor( fun() -> process_flag(trap_exit, true), Result = gen_server:call({?NAME,Node}, Request, Timeout), exit({self(),Tag,Result}) end), receive {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> rpc_check(Result); {'DOWN',Mref,_,_,Reason} -> rpc_check_t({'EXIT',Reason}) end. |
RPC模块会将超时或对方节点失去连接的情况处理为bad_rpc,让顶层逻辑发现并非业务本身引起的远程调用问题。
在被调用者一些,RPC模块也会立刻创建一个监控下的Erlang进程,并在该进程内处理调用 者的call消息,同时会将相关信息保存在rex进程的进程上下文中。当新的进程完成了业务 处理,就会把处理结果返回给被调用者节点的rex进程,然后再将结果返回给调用发起者。 我们可以仔细观察它的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
handle_call_call(Mod, Fun, Args, Gleader, To, S) -> RpcServer = self(), %% Spawn not to block the rpc server. {Caller,_} = erlang:spawn_monitor( fun () -> set_group_leader(Gleader), Reply = case catch apply(Mod, Fun, Args) of {'EXIT', _} = Exit -> {badrpc, Exit}; Result -> Result end, RpcServer ! {self(), {reply, Reply}} end), {noreply, gb_trees:insert(Caller, To, S)}. |
block_call方法
于call方法一样,在调用发起者一侧,RPC模块会立刻建立一个监控下的Erlang进程,并在 该进程内通过gen_server:call方法来调用远程节点。
但是在被调用者一些,RPC模块会选择使用被调用者所在节点的rex直接执行相关代码
1 2 3 4 5 6 7 8 9 10 11 12 |
handle_call({block_call, Mod, Fun, Args, Gleader}, _To, S) -> MyGL = group_leader(), set_group_leader(Gleader), Reply = case catch apply(Mod,Fun,Args) of {'EXIT', _} = Exit -> {badrpc, Exit}; Other -> Other end, group_leader(MyGL, self()), % restore {reply, Reply, S}; |
同步调用总结
call方法可以保证,同一调用者的远程请求按序列执行,但是不保证多个调用者的远程请求 按序列执行。 block_call方法保证,多个调用者的远程请求按序列执行。 不管是call还是 block_call的方法都会给调用者带来大量的进程创建的压力(Erlang创建进程很快,但不代 表没有代价)。 call方法还会给被调用者节点带来大量的进程创建压力。
cast方法
RPC模块的cast方法直接依赖于gen_sever:cast,并没有做更多的事情。针对本地节点, cast方法会在调用者节点内创建一个进程来执行相关代码:
1 2 3 4 5 6 |
cast(Node, Mod, Fun, Args) when Node =:= node() -> catch spawn(Mod, Fun, Args), true; cast(Node, Mod, Fun, Args) -> gen_server:cast({?NAME,Node}, {cast,Mod,Fun,Args,group_leader()}), true. |
被调用者接收到消息后会立刻创建进程执行相关代码:
1 2 3 4 5 6 |
handle_cast({cast, Mod, Fun, Args, Gleader}, S) -> spawn(fun() -> set_group_leader(Gleader), apply(Mod, Fun, Args) end), {noreply, S}; |
cast方法总结
cast方法是非常简单的。和call方法一样,会给被调用者节点带来大量的进程创建压力。 同样不要忘记了,cast方法也会将新创建的进程的console输出重新定向调用者所在节点的 group leader上。
abcast和sbcast
这两个方法都是通过erlang:send
将调用者的消息发送到被调用者节点上。
abcast
abcast采用的是纯异步,发出去就不管了,直接将消息不经过rex进程直接发送到目标进程 上
1 2 3 4 5 6 7 8 9 10 11 12 13 |
abcast(Name, Mess) -> abcast([node() | nodes()], Name, Mess). abcast([Node|Tail], Name, Mess) -> Dest = {Name,Node}, %这么做的好处是不会让进程被trap %从而保证了异步性 case catch erlang:send(Dest, Mess, [noconnect]) of noconnect -> spawn(erlang, send, [Dest,Mess]), ok; _ -> ok end, abcast(Tail, Name, Mess); abcast([], _,_) -> abcast. |
此处abcast完全是异步的,如果发现了目标节点是没有连接的时候,直接创建一个新的进程 来进行消息发送,完全不会进入Trap状态等待节点连接。
sbcast
sbcast算是同步的广播方式,发送后会回收广播结果,并且当节点没有完成连接的时候,会 进入Trap状态等待节点连接完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
sbcast(Name, Mess) -> sbcast([node() | nodes()], Name, Mess). sbcast(Nodes, Name, Mess) -> Monitors = send_nodes(Nodes, ?NAME, {sbcast, Name, Mess}, []), rec_nodes(?NAME, Monitors). send_nodes([Node|Tail], Name, Msg, Monitors) when is_atom(Node) -> Monitor = start_monitor(Node, Name), %% Handle non-existing names in rec_nodes. catch {Name, Node} ! {self(), Msg}, send_nodes(Tail, Name, Msg, [Monitor | Monitors]); send_nodes([_Node|Tail], Name, Msg, Monitors) -> %% Skip non-atom _Node send_nodes(Tail, Name, Msg, Monitors); send_nodes([], _Name, _Req, Monitors) -> Monitors. rec_nodes(Name, Nodes) -> rec_nodes(Name, Nodes, [], []). rec_nodes(_Name, [], Badnodes, Replies) -> {Replies, Badnodes}; rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) -> receive {'DOWN', R, _, _, _} -> rec_nodes(Name, Tail, [N|Badnodes], Replies); {?NAME, N, {nonexisting_name, _}} -> %% used by sbcast() erlang:demonitor(R, [flush]), rec_nodes(Name, Tail, [N|Badnodes], Replies); {Name, N, Reply} -> %% Name is bound !!! erlang:demonitor(R, [flush]), rec_nodes(Name, Tail, Badnodes, [Reply|Replies]) end. |
总结
RPC模块是Erlang/OTP当中非常重要的一个模块,它的存在简化了很多编码工作但是也为我 们带来了很多隐患
- 单个gen_server的消息承载能力和block_call的阻塞
- 错误使用时,RPC模块大量的进程创建,影响了Erts的调度
- 使用RPC模块来传递大块的数据,引起dist_busy_port
- group leader重定向引起的数据量叠加,引起dist_busy_port
当然这些问题不影响我们正常使用RPC模块,例如我们可以将RPC用于以下场景:
- Erlang集群构建boot阶段
- Erlang集群的元信息和控制信息交换