深入浅出 Mnesia-schema 创建 (1)
Erlang的Mnesia数据库如何创建schema
Mnesia是什么
Mnesia是一个分布式数据库管理系统(DBMS),适合于电信和其它需要持续运行和具备软实 时特性的Erlang应用,是构建电信应用的控制系统平台开放式电信平台(OTP)的一部分。 从 这里可以看出Mnesia是Erlang/OTP平台内置的数据库。开发该数据库的原因是电信应用苛刻 的容错和高可靠性需求,这些需求如下:
- 实时快速的键/值检索
- 非实时的复杂查询主要在运营和维护时进行
- 分布式的应用,从而数据也必须分布
- 高容错性
- 可动态重新配置
- 存储复杂的对象数据
如何使用Mnesia
Mnesia作为一个数据库,使用的时候就有一定的要求,相对于其它数据库而言,这些需求是 非常简单的。使用Mnesia需要满足以下需求:
- 操作系统可以运行Erlang/OTP平台
- 已经创建Mnesia的schema表
满足这两点Mnesia就可以使用了。本文将介绍Mnesia是如何创建schema表的
Mnesia的cstruct结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
-record(cstruct, {name, % Atom 表名字 type = set, % set | bag ram_copies = [], % [Node] disc_copies = [], % [Node] disc_only_copies = [], % [Node] load_order = 0, % Integer access_mode = read_write, % read_write | read_only majority = false, % true | false index = [], % [Integer] snmp = [], % Snmp Ustruct local_content = false, % true | false record_name = {bad_record_name}, % Atom (Default = Name) 表中存放的record的名字 attributes = [key, val], % [Atom] record中的属性名字 user_properties = [], % [Record] frag_properties = [], % [{Key, Val] storage_properties = [], % [{Key, Val] cookie = ?unique_cookie, % Term version = {{2, 0}, []}}). % {{Integer, Integer}, [Node]} |
Erlang的cstruct非常简明扼要的定义了Mnesia的一张表的属性。对Mnesia来讲,一张表最 基本需要包含下面的信息
- name,表名字
- type,存储模式
- access_mode,访问权限
- record_name,存储字段的record名称
- attributes,字段名称
剩下的字段,更多是和集群,容错以及分片相关的。因为有分片技术的存在,就不要再说 Mnesia存储上限是4G啥的了。
schema创建
确认无schema阶段
在mnesia_bup的create_schema中会使用mnesia_schema:ensure_no_schema
来确认单节点 或集群的所有节点上都没有schema相关的数据。
1 2 3 4 5 6 7 8 9 10 11 |
%%尝试读取远程的schema ensure_no_schema([H|T]) when is_atom(H) -> case rpc:call(H, ?MODULE, remote_read_schema, []) of {badrpc, Reason} -> %% 返回建表失败 {H, {"All nodes not running", H, Reason}}; {ok,Source, _} when Source /= default -> %% 返回的source是非default的时候,就代表已经存在了schema表 {H, {already_exists, H}}; _ -> ensure_no_schema(T) end; ensure_no_schema([H|_]) -> {error,{badarg, H}}; ensure_no_schema([]) -> ok. |
ensure_no_schema是通过Erlang/OTP平台的rpc模块来尝试读取所有节点是否存在,如果存 在了会告诉发起创建请求的进程already_exists,如果某个节点无法链接,就会报错。如果 在这阶段出现异常,会立刻终止创建。
构建临时备份阶段
mnesia_bup会在mnesia数据目录下创建一个节点名+时间戳的临时文件,类似 [email protected] 这种形式。之后会通过make_initial_backup来从0构 建一个backup文件,用来创建Mnesia的schema。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
make_initial_backup(Ns, Opaque, Mod) -> %%获取最开始的元数据表 %%元数据是cstruct的[{key,value}]形式 Orig = mnesia_schema:get_initial_schema(disc_copies, Ns), %% 删除掉storage_properties和majority这两个字段 Modded = proplists:delete(storage_properties, proplists:delete(majority, Orig)), %% 向schema表中写入表名和cstruct Schema = [{schema, schema, Modded}], O2 = do_apply(Mod, open_write, [Opaque], Opaque), %写入日志头 %% 包括日志版本,日志类型,mnesia版本,节点名称,生成时间 %% 这里日志版本1.2 类型 backup_log O3 = do_apply(Mod, write, [O2, [mnesia_log:backup_log_header()]], O2), %写入schema数据 O4 = do_apply(Mod, write, [O3, Schema], O3), %%生成Opaque所代表的文件 O5 = do_apply(Mod, commit_write, [O4], O4), {ok, O5}. |
先通过mnesia_schema:get_initial_schema
构建出一个schema的cstruct结构,然后通过 mnesia_backup的日志模式,[{schema, schema, Modded}]写入的日志文件中。在写入真实 数据前,会先写入一个mnesia_log:backup_log_header()
的日志头,用来说明是什么日志和 日志的版本。
安装备份阶段
mnesia_bup会使用do_install_fallback来将上一个阶段构建出来的临时备份安装到单节点 或集群上。 安装过程可以明确为以下几步:
- 初始化安装进程
- 初始化安装状态
- 在集群个节点上创建fallback_receiver
- 从上阶段临时文件中读取数据,并同步到集群各个节点上
1 2 3 4 5 6 7 8 9 10 11 12 13 |
install_fallback_master(ClientPid, FA) -> %% 捕获退出异常,关联进程崩溃,但是并不真正捕获 %% 而是防止崩溃后引起当前进程退出,打断元数据创建 process_flag(trap_exit, true), %% 设置状态 State = {start, FA}, %% 拿出日志文件 Opaque = FA#fallback_args.opaque, Mod = FA#fallback_args.module, Res = (catch iterate(Mod, fun restore_recs/4, Opaque, State)), unlink(ClientPid), ClientPid ! {self(), Res}, exit(shutdown). |
安装备份文件的时候,会创建一个进程来进行备份文件安装。该进程为了防止请求进程打断 安装过程,会进行退出异常捕获。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
-spec fallback_receiver(pid(), fallback_args()) -> no_return(). %Master,在此处表示,整个mnesia集群在create_schema的时候的发起者 fallback_receiver(Master, FA) -> process_flag(trap_exit, true), %将自己注册到本地名字库,防止创建出另一个fallback_receiver进程 case catch register(mnesia_fallback, self()) of {'EXIT', _} -> Reason = {already_exists, node()}, local_fallback_error(Master, Reason); true -> FA2 = check_fallback_dir(Master, FA), Bup = FA2#fallback_args.fallback_bup, %检查是否有backup case mnesia_lib:exists(Bup) of true -> %如果有则报错 Reason2 = {already_exists, node()}, local_fallback_error(Master, Reason2); false -> %如果没有,创建新的backup的临时文件 Mod = mnesia_backup, %% 删除FALLBACK.TMP文件 Tmp = FA2#fallback_args.fallback_tmp, R = #restore{mode = replace, bup_module = Mod, bup_data = Tmp}, file:delete(Tmp), %开始接收fallback信息 case catch fallback_receiver_loop(Master, R, FA2, schema) of {error, Reason} -> local_fallback_error(Master, Reason); Other -> exit(Other) end end end. |
fallback_receiver进程会在集群的每个节点上创建一个,其中的Master就是上面所说的 install_fallback_master这个函数所在的进程。fallback_receiver只是做一些基本的防止 重入和错误检查,真正的业务是在fallback_receiver_loop函数中处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
fallback_receiver_loop(Master, R, FA, State) -> receive {Master, {start, Header, Schema}} when State =:= schema -> Dir = FA#fallback_args.mnesia_dir, throw_bad_res(ok, mnesia_schema:opt_create_dir(true, Dir)), %% 创建FALLBACK.TMP文件 R2 = safe_apply(R, open_write, [R#restore.bup_data]), R3 = safe_apply(R2, write, [R2#restore.bup_data, [Header]]), BupSchema = [schema2bup(S) || S <- Schema], R4 = safe_apply(R3, write, [R3#restore.bup_data, BupSchema]), Master ! {self(), ok}, %% schema的日志已经写入文件了 %% 状态切换到接收records fallback_receiver_loop(Master, R4, FA, records); {Master, {records, Recs}} when State =:= records -> R2 = safe_apply(R, write, [R#restore.bup_data, Recs]), Master ! {self(), ok}, fallback_receiver_loop(Master, R2, FA, records); %收到swap,进行commit,并将临时文件重命名为backup文件 {Master, swap} when State =/= schema -> ?eval_debug_fun({?MODULE, fallback_receiver_loop, pre_swap}, []), safe_apply(R, commit_write, [R#restore.bup_data]), Bup = FA#fallback_args.fallback_bup, Tmp = FA#fallback_args.fallback_tmp, %% 立刻重命名文件,将FALLBACK.TMP重命名为FALLBACK.BUP throw_bad_res(ok, file:rename(Tmp, Bup)), catch mnesia_lib:set(active_fallback, true), ?eval_debug_fun({?MODULE, fallback_receiver_loop, post_swap}, []), Master ! {self(), ok}, fallback_receiver_loop(Master, R, FA, stop); {Master, stop} when State =:= stop -> stopped; Msg -> safe_apply(R, abort_write, [R#restore.bup_data]), Tmp = FA#fallback_args.fallback_tmp, file:delete(Tmp), throw({error, "Unexpected msg fallback_receiver_loop", Msg}) end. |
fallback_receiver_loop循环State的初始值为{start,FA},接着不断从发起者出接受 schema数据和record数据,并写入FALLBACK.TMP中,当发起者传送完所有数据会要求 fallback_receiver进程将FALLBACK.TMP文件重命名为FALLBACK.BUP。
总结
至此,schema创建的第一个阶段已经结束,但是发现mnesia数据目录下并没有生成 schema.DAT文件,在后续的文章中将会介绍如何生成schema.DAT文件