Skip to content

日志分类:



用Mochiweb打造百万级Comet应用,第三部分

提示:如有转载请注明作者 小游戏 及出处

原文:A Million-user Comet Application with Mochiweb, Part 3

参考资料:Comet–基于 HTTP 长连接、无须在浏览器端安装插件的“服务器推”技术为“Comet”
             MochiWeb–建立轻量级HTTP服务器的Erlang库

在这个系列的第一部分 和第二部分 展示了怎样用mochiweb构建一个comet应用,怎样把消息路由到连接的客户端。 我们完成了把应用内存压缩到每个连接8KB的程度。我们也做了老式的c10k测试, 注意到10,000个连接用户时到底发生了什么。 我们也做了几个图。很有乐趣,但是现在是时候把我们标题所宣称的做好了,把它调优到一百万个连接。

有以下内容:

  • 添加一个发布订阅式的基于Mnesia的订阅数据库
  • 为一百万用户生成一个真实的朋友数据集
  • 调整mnesia载入朋友数据
  • 从一个机子打开一百万连接
  • 有一百万连接用户的基准测试
  • 用Libevent + C进行连接处理
  • 最后的思考

这个测试的挑战之一是实际上一个测试用机实际上只能打开1M个连接。写一个能接收1M个连接的服务器比创建1M个连接用来测试更容易些,所以这篇文章的相当一部分是关于在一台机器上打开1M个连接的技术 。

赶快进行我们的发布订阅

第二部分 我们用路由器给特定用户发送消息。对于聊天/及时通讯系统这是很好的,但是我们有更加有吸引力的事情要做。在我们进行大规模伸缩测试前,让我们再添加一个订阅数据库。我们让应用存储你的朋友是谁,这样, 当你的朋友有些什么事情消息时都会推送给你.

我的意图是把这个用于Last.fm,我能够得到实时的我朋友正在听的歌曲的反馈。他也同样的适合由社会化网络产生的其他信息  Flickr图片上传,Facebook的newsfeed, Twitter的消息,总总。 FriendFeed甚至有一个beta版的实时API,所以这种事确定很热门. (即使我还没有听说过除了Facebook用Erlang做这种事)。

实现订阅管理器

我们正实现一个通用订阅管理器,但是我们将把一个人自动签名给其朋友列表中的人 - 这样你可以认为这就是一个朋友数据库。

订阅管理器API:

  • add_subscriptions([{Subscriber, Subscribee},...])
  • remove_subscriptions([{Subscriber, Subscribee},...])
  • get_subscribers(User)

subsmanager.erl

  1. -module ( subsmanager) .
  2. -behaviour ( gen_server) .
  3. -include( “/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl” ) .
  4. -export ( [ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 , code_change/3 ] ) .
  5. -export ( [ add_subscriptions/1 ,
  6.          remove_subscriptions/1 ,
  7.          get_subscribers/1 ,
  8.          first_run/0 ,
  9.          stop/0 ,
  10.          start_link /0 ] ) .
  11. -record( subscription, { subscriber, subscribee} ) .
  12. -record( state, { } ) . % state is all in mnesia
  13. -define( SERVER , global:whereis_name ( ?MODULE ) ) .
  14.  
  15. start_link ( ) ->
  16.     gen_server :start_link ( { global, ?MODULE } , ?MODULE , [ ] , [ ] ) .
  17.  
  18. stop( ) ->
  19.     gen_server :call ( ?SERVER , { stop} ) .
  20.  
  21. add_subscriptions( SubsList ) ->
  22.     gen_server :call ( ?SERVER , { add_subscriptions, SubsList } , infinity) .
  23.  
  24. remove_subscriptions( SubsList ) ->
  25.     gen_server :call ( ?SERVER , { remove_subscriptions, SubsList } , infinity) .
  26.  
  27. get_subscribers( User ) ->
  28.     gen_server :call ( ?SERVER , { get_subscribers, User } ) .
  29.  
  30. %%
  31.  
  32. init( [ ] ) ->
  33.     ok = mnesia:start ( ) ,
  34.     io:format ( “Waiting on mnesia tables..\n ” ,[ ] ) ,
  35.     mnesia:wait_for_tables ( [ subscription] , 30000 ) ,
  36.     Info = mnesia:table_info ( subscription, all) ,
  37.     io:format ( “OK. Subscription table info: \n ~w\n \n ” ,[ Info ] ) ,
  38.     { ok, #state{ } } .
  39.  
  40. handle_call( { stop} , _From , State ) ->
  41.     { stop, stop, State } ;
  42.  
  43. handle_call( { add_subscriptions, SubsList } , _From , State ) ->
  44.     % Transactionally is slower:
  45.     % F = fun() ->
  46.     %         [ ok = mnesia:write(S) || S <- SubsList ]
  47.     %     end,
  48.     % mnesia:transaction(F),
  49.     [ mnesia:dirty_write ( S ) || S <- SubsList ] ,
  50.     { reply, ok, State } ; 
  51.  
  52. handle_call( { remove_subscriptions, SubsList } , _From , State ) ->
  53.     F = fun( ) ->
  54.         [ ok = mnesia:delete_object ( S ) || S <- SubsList ]
  55.     end ,
  56.     mnesia:transaction ( F ) ,
  57.     { reply, ok, State } ;
  58.  
  59. handle_call( { get_subscribers, User } , From , State ) ->
  60.     F = fun( ) ->
  61.         Subs = mnesia:dirty_match_object ( #subscription{ subscriber=‘_’ , subscribee=User } ) ,
  62.         Users = [ Dude || #subscription{ subscriber=Dude , subscribee=_} <- Subs ] ,
  63.         gen_server:reply ( From , Users )
  64.     end ,
  65.     spawn( F ) ,
  66.     { noreply, State } .
  67.  
  68. handle_cast( _Msg , State ) -> { noreply, State } .
  69. handle_info( _Msg , State ) -> { noreply, State } .
  70.  
  71. terminate( _Reason , _State ) ->
  72.     mnesia :stop ( ) ,
  73.     ok.
  74.  
  75. code_change( _OldVersion , State , _Extra ) ->
  76.     io :format ( “Reloading code for ?MODULE\n ” ,[ ] ) ,
  77.     { ok, State } .
  78.  
  79. %%
  80.  
  81. first_run( ) ->
  82.     mnesia :create_schema ( [ node( ) ] ) ,
  83.     ok = mnesia:start ( ) ,
  84.     Ret = mnesia:create_table ( subscription,
  85.     [
  86.      { disc_copies, [ node( ) ] } ,
  87.      { attributes, record_info( fields, subscription) } ,
  88.      { index, [ subscribee] } , %index subscribee too
  89.      { type, bag}
  90.     ] ) ,
  91.     Ret .

几点值得注意的:

  • 我包含了qlc.hrl,mnesia用list comprehension做查询时需要,用了绝对路径。那不是最好的方法。
  • get_subscribers 生成另外一个进程且把这个工作委派给他,用gen_server:reply 。这意味这gen_server loop 不能组塞在那个调用上,假如我们抛出大量查找在其上,那么mnesia会慢下来。
  • rr(”subsmanager.erl”). 下面的例子允许你在erl shell中用record定义。把你的record定义写入records.hrl文件并把它包含到你的模块中,这是一种很好的形式,我嵌入它是为了比较简洁。

现在测试他。first_run() 创建 mnesia schema, 因此首先运行它是很重要的。另一个隐含的问题是在mnesia中数据库只能被创建他的那个节点访问,因此给erl shell 一个名称,关联起来。

$ mkdir /var/mnesia
$ erl -boot start_sasl -mnesia dir ‘”/var/mnesia_data”‘ -sname subsman
(subsman@localhost)1> c(subsmanager).
{ok,subsmanager}
(subsman@localhost)2> subsmanager:first_run().

{atomic,ok}
(subsman@localhost)3> subsmanager:start_link().
Waiting on mnesia tables..
OK. Subscription table info:
[{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,’_’,’_’}},{{index,3},57378}]


{ok,<0.105.0>}
(subsman@localhost)4> rr(”subsmanager.erl”).
[state,subscription]
(subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]).
ok
(subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)7> subsmanager:get_subscribers(rj).
[bob,alice]
(subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)8> subsmanager:get_subscribers(rj).
[alice]
(subsman@localhost)10> subsmanager:get_subscribers(charlie).
[]

为测试我们将用整数id值标志用户-但这个测试我用原子(rj, alice, bob)且假设alice和bob都在rj的朋友列表中。非常好mnesia (和ets/dets)不关心你用的什么值-任何Erlang term都可以。这意味着给多种支持的资源升级是很简单的。你可以开始用{user, 123}或{photo, 789}描述人们可能订阅的不同的事情 , 不用改变subsmanager模块的任何东西。

Modifying the router to use subscriptions

取代给特定的用户传递消息,也就是router:send(123, "Hello user 123"),我们将用主题标志消息 - 也就是,生成消息的人们(放歌的,上传图片的,等等) - 拥有路由器投递消息给订阅他主题的每个用户。换句话说,将像这样工作:router:send(123, "Hello everyone subscribed to user 123")

Updated router.erl:

  1. -module ( router) .
  2. -behaviour ( gen_server) .
  3.  
  4. -export ( [ start_link /0 ] ) .
  5. -export ( [ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 ,
  6.      terminate/2 , code_change/3 ] ) .
  7.  
  8. -export ( [ send/2 , login/2 , logout/1 ] ) .
  9.  
  10. -define( SERVER , global:whereis_name ( ?MODULE ) ) .
  11.  
  12. % will hold bidirectional mapping between id <–> pid
  13. -record( state, { pid2id, id2pid} ) .
  14.  
  15. start_link ( ) ->
  16.     gen_server :start_link ( { global, ?MODULE } , ?MODULE , [ ] , [ ] ) .
  17.  
  18. % sends Msg to anyone subscribed to Id
  19. send( Id , Msg ) ->
  20.     gen_server :call ( ?SERVER , { send, Id , Msg } ) .
  21.  
  22. login( Id , Pid ) when is_pid( Pid ) ->
  23.     gen_server :call ( ?SERVER , { login, Id , Pid } ) .
  24.  
  25. logout( Pid ) when is_pid( Pid ) ->
  26.     gen_server :call ( ?SERVER , { logout, Pid } ) .
  27.  
  28. %%
  29.  
  30. init( [ ] ) ->
  31.     % set this so we can catch death of logged in pids:
  32.     process_flag( trap_exit, true) ,
  33.     % use ets for routing tables
  34.     { ok, #state{
  35.                 pid2id = ets:new ( ?MODULE , [ bag] ) ,
  36.                 id2pid = ets:new ( ?MODULE , [ bag] )
  37.                }
  38.     } .
  39.  
  40. handle_call( { login, Id , Pid } , _From , State ) when is_pid( Pid ) ->
  41.     ets :insert ( State #state.pid2id, { Pid , Id } ) ,
  42.     ets:insert ( State #state.id2pid, { Id , Pid } ) ,
  43.     link( Pid ) , % tell us if they exit, so we can log them out
  44.     %io:format(”~w logged in as ~w\n”,[Pid, Id]),
  45.     { reply, ok, State } ;
  46.  
  47. handle_call( { logout, Pid } , _From , State ) when is_pid( Pid ) ->
  48.     unlink ( Pid ) ,
  49.     PidRows = ets:lookup ( State #state.pid2id, Pid ) ,
  50.     case PidRows of
  51.         [ ] ->
  52.             ok ;
  53.         _ ->
  54.             IdRows = [ { I ,P } || { P ,I } <- PidRows ] , % invert tuples
  55.             ets:delete ( State #state.pid2id, Pid ) ,   % delete all pid->id entries
  56.             [ ets:delete_object ( State #state.id2pid, Obj ) || Obj <- IdRows ] % and all id->pid
  57.     end ,
  58.     %io:format(”pid ~w logged out\n”,[Pid]),
  59.     { reply, ok, State } ;
  60.  
  61. handle_call( { send, Id , Msg } , From , State ) ->
  62.     F = fun( ) ->
  63.         % get users who are subscribed to Id:
  64.         Users = subsmanager:get_subscribers ( Id ) ,
  65.         io:format ( “Subscribers of ~w = ~w\n ” ,[ Id , Users ] ) ,
  66.         % get pids of anyone logged in from Users list:
  67.         Pids0 = lists:map (
  68.             fun( U ) ->
  69.                 [ P || { _I , P } <- ets:lookup ( State #state.id2pid, U ) ]
  70.             end ,
  71.             [ Id | Users ] % we are always subscribed to ourselves
  72.         ) ,
  73.         Pids = lists:flatten ( Pids0 ) ,
  74.         io:format ( “Pids: ~w\n ” , [ Pids ] ) ,
  75.         % send Msg to them all
  76.         M = { router_msg, Msg } ,
  77.         [ Pid ! M || Pid <- Pids ] ,
  78.         % respond with how many users saw the message
  79.         gen_server:reply ( From , { ok, length( Pids ) } )
  80.     end ,
  81.     spawn( F ) ,
  82.     { noreply, State } .
  83.  
  84. % handle death and cleanup of logged in processes
  85. handle_info( Info , State ) ->
  86.     case Info of
  87.         { ‘EXIT’ , Pid , _Why } ->
  88.             handle_call ( { logout, Pid } , blah, State ) ;
  89.         Wtf ->
  90.             io :format ( “Caught unhandled message: ~w\n ” , [ Wtf ] )
  91.     end ,
  92.     { noreply, State } .
  93.  
  94. handle_cast( _Msg , State ) ->
  95.     { noreply, State } .
  96. terminate( _Reason , _State ) ->
  97.     ok .
  98. code_change( _OldVsn , State , _Extra ) ->
  99.     { ok, State } .

 

这是一个不需要mochiweb的快速测试 - 我用原子代替用户ID, 为清晰忽略了一些输出

(subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl").
(subsman@localhost)2> subsmanager:start_link().
(subsman@localhost)3> router:start_link().
(subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}].
[#subscription{subscriber = alice,subscribee = rj},
#subscription{subscriber = bob,subscribee = rj}]
(subsman@localhost)5> subsmanager:add_subscriptions(Subs).
ok
(subsman@localhost)6> router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids: []
{ok,0}
(subsman@localhost)7> router:login(alice, self()).
ok
(subsman@localhost)8> router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids: [<0.46.0>]
{ok,1}
(subsman@localhost)9> receive {router_msg, M} -> io:format(”~s\n”,[M]) end.
RJ did something
ok

这演示了当主题是她订阅的某人 (rj),alice怎样接收一条消息, 即使这条消息不是直接发送给alice的。输出显示路由器尽可能的标志目标为[alice,bob] 但是消息值传给一个人alice, 因为bob还没有登陆。

生成一个典型的社会化网络朋友数据集

我们可以随机的生成大量的朋友关系,但是那样特别不真实。 社会化网络有助于发挥分布规则的力量。社会化网络通常很少有超公众化的用户(一些 Twitter 用户 有超过100,000的追随者) 而是很多的人只有少量的几个朋友。Last.fm朋友数据就是个典型 - 他符合Barabási–Albert 图模型 , 因此它就是我用的类型。

为了生成数据集,我用了很出色的igraph库 的模块:

fakefriends.py:

  1. import igraph
  2. g = igraph.Graph .Barabasi ( 1000000 , 15 , directed=False )
  3. print “Edges: “ + str ( g.ecount ( ) ) + ” Verticies: “ + str ( g.vcount ( ) )
  4. g.write_edgelist ( “fakefriends.txt” )

这产生了用空格分隔的每行2个用户id。这就有了我们要调入subsmanager的朋友关系数据,用户id从1到一百万。

向mnesia大量调入朋友数据

这个小模块读fakefriends.txt文件并创建一个订阅记录列表.

readfriends.erl - 读fakefriends.txt创建订阅记录

  1. -module ( readfriends) .
  2. -export ( [ load/1 ] ) .
  3. -record( subscription, { subscriber, subscribee} ) .
  4.  
  5. load( Filename ) ->
  6.     for_each_line_in_file ( Filename ,
  7.         fun( Line , Acc ) ->
  8.             [ As , Bs ] = string:tokens ( string:strip ( Line , right, $\n) , ” “ ) ,
  9.             { A , _} = string:to_integer ( As ) ,
  10.             { B , _} = string:to_integer ( Bs ) ,
  11.             [ #subscription{ subscriber=A , subscribee=B } | Acc ]
  12.         end , [ read] , [ ] ) .
  13.  
  14.     { ok, Device } = file:open ( Name , Mode ) ,
  15.     for_each_line( Device , Proc , Accum0 ) .
  16.  
  17. for_each_line( Device , Proc , Accum ) ->
  18.     case io:get_line ( Device , “” ) of
  19.         eof  -> file :close ( Device ) , Accum ;
  20.         Line -> NewAccum = Proc ( Line , Accum ) ,
  21.                     for_each_line( Device , Proc , NewAccum )
  22.     end .

现在在subsmanager shell中, 你可以从文本中读数据并添加订阅:

$ erl -name  router at minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40
erl> c(readfriends), c(subsmanager).
erl> subsmanager:first_run().
erl> subsmanager:start_link().
erl> subsmanager:add_subscriptions( readfriends:load(”fakefriends.txt”) ).

注意这额外的mnesia参数 - 这是避免** WARNING ** Mnesia is overloaded 你可能在别的地方看到的警告信息。提到我以前发表的: On bulk loading data into Mnesia 有另外的调入大量数据的方法。最好的解决方案看起来是设置这些选项(在评论中指出的, 谢谢Jacob!) 。Mnesia 参考手册 在Configuration参数中包含了很多其他的设置,值得一看.

调到一百万

在一台主机上创建一百万个tcp连接是可以的。 我有个感觉就是做这个是用个小集群来模拟大量的客户端连接,可能运行一个像Tsung的真实工具。 甚至调整增加内核内存,增加文件描述符限制,设置本地端口范围到最大值,我们将一直坚持打破临时端口的限制。当建立一个tcp连接时,客户端被分配(或者你可以指定)一个端口,范围在 /proc/sys/net/ipv4/ip_local_port_range里 . 假如你手工指定也没什么问题, 用临时端口我们会超出界限。 在第一部分,我们设置这个范围在“1024 65535″之间 - 这就意味这有65535-1024 = 64511个端口可用。他们中的一些将会被别的进程使用,但是我们从没有超过64511个客户连接,因为我们会超出端口界限。

局部端口区间被赋给ip的一段, 因此假如我们是我们输出连接在一个指定的局部端口区间的话我们就能够打开大于64511 个外出连接。

因此让我们弄出17个新的IP地址, 每个让他建立62000个连接 - 给我们总共1,054,000 个连接.

$ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done

假如你现在运行ifconfig 你应该看到你的虚拟往里接口: eth0:1, eth0:2 … eth0:17, 每个有不同的IP地址。很显然,你应该选择一个你所需要的地址空间。

现在剩下的就是更改第一部分地道的floodtest工具 ,为其指定他应该连接的本地IP…不行的是erlang http 客户端 不让你指定源IP。 ibrowse,另一个可选的http客户端库也不行。妈的。

<疯狂的想法> 
我想到另外的一个选择:建立17对IP - 一个在服务器一个在客户端– 每对都有自己隔离的 /30 子网。我想假如我随后让客户端连接任何一个给定的服务器IP,他将迫使本地IP在子网上成为这对的另一部分,因为只有本地IP能够达到服务器IP。理论上 ,这将意味这在客户端声明本地源IP将不是必须的 (虽然服务器IP区间需要被指定).我不知道这是否能工作 - 这时听起来可以。最后因它太不正规了所以我决定不试了。
</疯狂的想法>

我也研究了OTP’s http_transport 代码并且想为其加入对指定本地IP的支持。尽管它不是你真正需要的一个特性,但它需要更多的工作。

gen_tcp 让你指定源IP ,因此我最终为这个测试用gen_tcp写一个比较粗糙的客户端:

floodtest2.erl

  1. -module ( floodtest2) .
  2. -compile( export_all) .
  3. -define( SERVERADDR , “10.1.2.3″ ) . % where mochiweb is running
  4. -define( SERVERPORT , 8000 ) .
  5.  
  6. % Generate the config in bash like so (chose some available address space):
  7. % EACH=62000; for i in `seq 1 17`; do echo “{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, “; done
  8.  
  9. run( Interval ) ->
  10.         Config = [
  11. { { 10 ,0 ,0 ,1 } , 1 , 62000 } ,
  12. { { 10 ,0 ,0 ,2 } , 62001 , 124000 } ,
  13. { { 10 ,0 ,0 ,3 } , 124001 , 186000 } ,
  14. { { 10 ,0 ,0 ,4 } , 186001 , 248000 } ,
  15. { { 10 ,0 ,0 ,5 } , 248001 , 310000 } ,
  16. { { 10 ,0 ,0 ,6 } , 310001 , 372000 } ,
  17. { { 10 ,0 ,0 ,7 } , 372001 , 434000 } ,
  18. { { 10 ,0 ,0 ,8 } , 434001 , 496000 } ,
  19. { { 10 ,0 ,0 ,9 } , 496001 , 558000 } ,
  20. { { 10 ,0 ,0 ,10 } , 558001 , 620000 } ,
  21. { { 10 ,0 ,0 ,11 } , 620001 , 682000 } ,
  22. { { 10 ,0 ,0 ,12 } , 682001 , 744000 } ,
  23. { { 10 ,0 ,0 ,13 } , 744001 , 806000 } ,
  24. { { 10 ,0 ,0 ,14 } , 806001 , 868000 } ,
  25. { { 10 ,0 ,0 ,15 } , 868001 , 930000 } ,
  26. { { 10 ,0 ,0 ,16 } , 930001 , 992000 } ,
  27. { { 10 ,0 ,0 ,17 } , 992001 , 1054000 } ] ,
  28.         start( Config , Interval ) .
  29.  
  30. start( Config , Interval ) ->
  31.         Monitor = monitor( ) ,
  32.         AdjustedInterval = Interval / length( Config ) ,
  33.         [ spawn( fun start/5 , [ Lower , Upper , Ip , AdjustedInterval , Monitor ] )
  34.           || { Ip , Lower , Upper }  <- Config ] ,
  35.         ok.
  36.  
  37. start( LowerID , UpperID , _, _, _) when LowerID == UpperID -> done ;
  38. start( LowerID , UpperID , LocalIP , Interval , Monitor ) ->
  39.         spawn ( fun connect/5 , [ ?SERVERADDR , ?SERVERPORT , LocalIP , "/test/" ++LowerID , Monitor ] ) ,
  40.         receive after Interval -> start ( LowerID + 1 , UpperID , LocalIP , Interval , Monitor ) end .
  41.  
  42. connect( ServerAddr , ServerPort , ClientIP , Path , Monitor ) ->
  43.         Opts = [ binary, { packet, 0 } , { ip, ClientIP } , { reuseaddr, true} , { active, false} ] ,
  44.         { ok, Sock } = gen_tcp:connect ( ServerAddr , ServerPort , Opts ) ,
  45.         Monitor ! open,
  46.         ReqL = io_lib:format ( “GET ~s\r \n Host: ~s\r \n \r \n ” , [ Path , ServerAddr ] ) ,
  47.         Req = list_to_binary( ReqL ) ,
  48.         ok = gen_tcp:send ( Sock , [ Req ] ) ,
  49.         do_recv( Sock , Monitor ) ,
  50.         ( catch gen_tcp:close ( Sock ) ) ,
  51.         ok.
  52.  
  53. do_recv( Sock , Monitor ) ->
  54.         case gen_tcp:recv ( Sock , 0 ) of
  55.                 { ok, B } ->
  56.                         Monitor ! { bytes, size( B ) } ,
  57.                         io:format ( “Recvd ~s\n ” , [ binary_to_list( B ) ] ) ,
  58.                         io:format ( “Recvd ~w bytes\n ” , [ size( B ) ] ) ,
  59.                         do_recv( Sock , Monitor ) ;
  60.                 { error, closed} ->
  61.                         Monitor ! closed,
  62.                         closed;
  63.                 Other ->
  64.                         Monitor ! closed,
  65.                         io:format ( “Other:~w\n ” ,[ Other ] )
  66.         end .
  67.  
  68. % Monitor process receives stats and reports how much data we received etc:
  69. monitor( ) ->
  70.         Pid = spawn( ?MODULE , monitor0, [ { 0 ,0 ,0 ,0 } ] ) ,
  71.         timer:send_interval ( 10000 , Pid , report) ,
  72.         Pid .
  73.  
  74. monitor0( { Open , Closed , Chunks , Bytes } =S ) ->
  75.         receive
  76.                 report  -> io :format ( “{Open, Closed, Chunks, Bytes} = ~w\n ” ,[ S ] ) ;
  77.                 open    -> monitor0 ( { Open + 1 , Closed , Chunks , Bytes } ) ;
  78.                 closed  -> monitor0 ( { Open , Closed + 1 , Chunks , Bytes } ) ;
  79.                 chunk   -> monitor0 ( { Open , Closed , Chunks + 1 , Bytes } ) ;
  80.                 { bytes, B } -> monitor0 ( { Open , Closed , Chunks , Bytes + B } )
  81.         end .

作为一个初始的测试,我像第一部分描述的那样连接mochiweb应用 - 它简单的每隔10秒给每个客户端发送一条消息。

erl> c(floodtest2), floodtest2:run(20).

这很快就吃掉了我的内存。

像那样用gen_tcp打开很多连接吃掉了很多内存。 在没有任何其他调整的情况下我想它需要~36GB的内存以保证正常工作。 我没有兴趣试着优化我的快速破解的erlang http 客户端(在真实世界了,这将是1M个web浏览器), 在手的有多于32GB内存的机子只有那台我们产品数据库用机, 我不能找到一个很好的理由就因为测试这个让last.fm下线:)另外,它看起来它一直只能管理打开64,500个端口 。

从这一点我决定采用值得相信的 libevent , 我很高兴发现有这么一个HTTP API。 新版已经有了一个evhttp_connection_set_local_address函数 。感觉很有希望.

这是采用libevent库用C编写的客户端:

  1. #include <sys/types.h>
  2. #include <sys/time.h>
  3. #include <sys/queue.h>
  4. #include <stdlib.h>
  5. #include <err.h>
  6. #include <event.h>
  7. #include <evhttp.h>
  8. #include <unistd.h>
  9. #include <stdio.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <time.h>
  13. #include <pthread.h>
  14.  
  15. #define BUFSIZE 4096
  16. #define NUMCONNS 62000
  17. #define SERVERADDR “10.103.1.43″
  18. #define SERVERPORT 8000
  19. #define SLEEP_MS 10
  20.  
  21. char buf[ BUFSIZE] ;
  22.  
  23. int bytes_recvd = 0 ;
  24. int chunks_recvd = 0 ;
  25. int closed = 0 ;
  26. int connected = 0 ;
  27.  
  28. // called per chunk received
  29. void chunkcb( struct evhttp_request * req, void * arg)
  30. {
  31.     int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE ) ;
  32.     //printf(”Read %d bytes: %s\n”, s, &buf);
  33.     bytes_recvd += s;
  34.     chunks_recvd++;
  35.     if ( connected >= NUMCONNS && chunks_recvd%10000 ==0 )
  36.         printf ( “>Chunks: %d\t Bytes: %d\t Closed: %d\n ” , chunks_recvd, bytes_recvd, closed) ;
  37. }
  38.  
  39. // gets called when request completes
  40. void reqcb( struct evhttp_request * req, void * arg)
  41. {
  42.     closed++;
  43. }
  44.  
  45. int main( int argc, char **argv)
  46. {
  47.     event_init( ) ;
  48.     struct evhttp *evhttp_connection;
  49.     struct evhttp_request *evhttp_request;
  50.     char addr[ 16 ] ;
  51.     char path[ 32 ] ; // eg: “/test/123″
  52.     int i,octet;
  53.     for ( octet=1 ; octet<=17 ; octet++) {
  54.         sprintf( &addr, “10.224.0.%d” , octet) ;
  55.         for ( i=1 ;i<=NUMCONNS;i++) {
  56.             evhttp_connection = evhttp_connection_new( SERVERADDR, SERVERPORT) ;
  57.             evhttp_connection_set_local_address( evhttp_connection, &addr) ;
  58.             evhttp_set_timeout( evhttp_connection, 864000 ) ; // 10 day timeout
  59.             evhttp_request = evhttp_request_new( reqcb, NULL ) ;
  60.             evhttp_request->chunk_cb = chunkcb;
  61.             sprintf( &path, “/test/%d” , ++connected) ;
  62.             if ( i%100 ==0 )  printf ( “Req: %s\t ->\t %s\n ” , addr, &path) ;
  63.             evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path ) ;
  64.             evhttp_connection_set_timeout( evhttp_request->evcon, 864000 ) ;
  65.             event_loop( EVLOOP_NONBLOCK ) ;
  66.             if ( connected % 200 == 0 )
  67.                 printf ( \n Chunks: %d\t Bytes: %d\t Closed: %d\n ” , chunks_recvd, bytes_recvd, closed) ;
  68.             usleep( SLEEP_MS*1000 ) ;
  69.         }
  70.     }
  71.     event_dispatch( ) ;
  72.     return 0 ;
  73. }

更多的参数用#define’s硬编码,这样你通过编辑源码来配置他然后重新编译。

编印运行:
$ gcc -o httpclient httpclient.c -levent
$ ./httpclient

这样还是不能打开多余64,500个端口 . 尽管他用了很少的内存。

尽管我指定了本地地址端口还是会超出限制, 临时端口无论是在内核或tcp栈上分配的都会超出2^16。 因此,为了能打开多于64,500个连接, 你需要指定本地地址和本地端口,相应的管理它们。不幸的libevent HTTP API没有选项指定本地端口。我 为 libevent打了补丁 加了一个合适的函数:
void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port); .

这相当不错; libevent编写的被很好, 文档也相当友好。

安装我修改过的libevent, 我可以在the set_local_address下添加如下代码:
evhttp_connection_set_local_port(evhttp_connection, 1024+i);

用他替换后, 从不同的地址的多个连接就能用同一端口号,指定本地地址。我重编译客户端让它运行一段时间以验证他能大多2^16限制。

Netstat验证 :
# netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}’
TIME_WAIT 8
ESTABLISHED 118222

这显示多少端口在不同状态被打开。我们最后能够打开多于2^16个连接.

现在我们有了在一台机子上打开百万http连接的工具。它看起来每个连接消耗2KB内存, 加上内核占用的。是时候用它测试百万连接用户的我们的mochiweb comet服务器了。

C1024K测试-1 百万comet连接

为了这个测试我用了4台不同配置的服务器。这样的配置可能比测试用的高,但是它是有用的将来会用为产品,这能做一个很变态的测试. 所有这四个服务器都在同一个千兆局域网上,中间用了3个交换机和一个路由器。

一百万测试就像第一、二部分的10k测试,主要不同是更改了客户端,现在是用libevent,c编写,我在一个用了多台机子的正式的分布式erlang设置中运行的a。

服务器1 - 四核 2GHz CPU, 16GB 内存

  • 启动订阅管理器
  • 调入好友数据
  • 启动路由器

服务器2 - 双通道四核 2.8GHz CPU, 32GB 内存

  • 启动mochiweb应用

服务器3 - 四核 2GHz CPU, 16GB 内存

  • 创建17个真实ip
  • 安装打了补丁的libevent
  • 运行客户端: ./httpclient 每秒建立100个连接直到1M

服务器4 - 双核 2GHz, 2GB内存

  • 运行msggen程序, 向路由器发送大量的消息

在猛增到一百万连接期间我测量了mochiweb的内存用量,还有在剩下的时间里:

httpclient在每个连接见加了10ms延时,因此打开一百万连接用了将近3个小时。打开1M连接的mochiweb进程固定地内存用大约为25GB. 运行的服务器都由Ganglia监控, 它测量CPU, 网络和内存用量并且生成漂亮的图片:

你可以看到它需要大约38GB内存且开始了swap。我猜想这个不同是因为内核为保持打开的连接而基本被耗光的 。当我开始发送消息是就达到了定点。

消息有1000个进程产生,每个进程平均60ms一条消息,总共每秒大约16,666条消息:

erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ].

服务器 (server-4) 产生消息看起来如下图所示(Ganglia):

每秒有10MB的消息发出 - 每秒16,666条消息. 典型的这些消息来自消息总线,应用服务器,或者已存在架构的一部分。

当我开始发送消息时,服务器1的负载(运行订阅管理器和路由器)一直低于1,CPU占用率从0增到5%。

服务器2的CPU (运行mochiweb应用, 有1M个连接) 增长的比较显著:

自然的,进程当要处理消息是不得不离开休眠状态,内存用量将轻微增加 . 没有消息且所有连接处于打开状态是内存用量的最好时候- 可想的,实际工作时需要更多内存。

从安全方面, mochiweb机器需要40GB内存一打开1M活跃comet连接。30GB用于mochiweb应用,剩下的10GB用于内核.换句话说,每个连接你需要分配40KB。

当用大量连接做各种测试时,我最终对我的sysctl.conf文件做了些修改. 只是部分试错,我真的不知道更多以做出明智的决定关于那个值需要修改的内部原因 . 我的策略是等待问题发生,检测 /var/log/kern.log 看什么神秘的错误被报告, 然后添加听起来很合理的数据. 这是上面测试使用的设置信息:

net.core.rmem_max = 33554432
net.core.wmem_max = 33554432
net.ipv4.tcp_rmem = 4096 16384 33554432
net.ipv4.tcp_wmem = 4096 16384 33554432
net.ipv4.tcp_mem = 786432 1048576 26777216
net.ipv4.tcp_max_tw_b****ets = 360000
 net.core.netdev_max_backlog = 2500
vm.min_free_kbytes = 65536
vm.swappiness = 0
net.ipv4.ip_local_port_range = 1024 65535

我将很喜欢学习更多关于linux tcp调优的知识这样我能对这些设置做更明智的决策. 这些可与确定不是优化的, 但是最少它们足够应付1M的连接. 这些更改运行在一个64bit的elang虚拟机上, 字长是8bytes而不是4, 可能可以解释为什么内存用量比我在第二部分做c10k测试时高的多。

一个用Libevent实现的Erlang C-Node

在为libevent加入HTTP api后, 它看起来完全合理做1M连接测试相对于用c写的http服务器,因此我们有了比较的基础。

我猜打开内核的poll模型意味这erlang虚拟机能够用epol(或类似的),但是即使是这样显然也需要解决负载问题,我们通过委派连接处理给用libevent实现的c程序或许能减轻负载. 我想重用更多的Erlang代码, 因此让我们尽可能少的用c - 只是在连接处理和HTTP部分。 我也寻找了试用Erlang C 接口的一个理由,因此下面的程序组合了两者。他是一个用C和libevent写的comet http服务器用用整数id标志用户(向我们的mochiweb应用), 且扮演一个Erlang C节点.

它连接一个指定的erlang节点, 监听像{123, <<"Hello user 123">>}的消息然后向用户 123分派 “Hello user 123″ , 假如已连接. 那些没有连接用户的消息被丢弃,就像前面的例子。

httpdcnode.c

  1. #include <sys/types.h>
  2. #include <sys/time.h>
  3. #include <sys/queue.h>
  4. #include <stdlib.h>
  5. #include <err.h>
  6. #include <event.h>
  7. #include <evhttp.h>
  8. #include <stdio.h>
  9. #include <sys/socket.h>
  10. #include <netinet/in.h>
  11.  
  12. #include “erl_interface.h”
  13. #include “ei.h”
  14.  
  15. #include <pthread.h>
  16.  
  17. #define BUFSIZE 1024
  18. #define MAXUSERS (17*65536) // C1024K
  19.  
  20. // List of current http requests by uid:
  21. struct evhttp_request * clients[ MAXUSERS+1 ] ;
  22. // Memory to store uids passed to the cleanup callback:
  23. int slots[ MAXUSERS+1 ] ;
  24.  
  25. // called when user disconnects
  26. void cleanup( struct evhttp_connection *evcon, void *arg)
  27. {
  28.     int *uidp = ( int *) arg;
  29.     fprintf( stderr, “disconnected uid %d\n ” , *uidp) ;
  30.     clients[ *uidp] = NULL ;
  31. }
  32.  
  33. // handles http connections, sets them up for chunked transfer,
  34. // extracts the user id and registers in the global connection table,
  35. // also sends a welcome chunk.
  36. void request_handler( struct evhttp_request *req, void *arg)
  37. {
  38.         struct evbuffer *buf;
  39.         buf = evbuffer_new( ) ;
  40.         if ( buf == NULL ) {
  41.             err( 1 , “failed to create response buffer” ) ;
  42.         }
  43.  
  44.         evhttp_add_header( req->output_headers, “Content-Type” , “text/html; charset=utf-8″ ) ;
  45.  
  46.         int uid = -1 ;
  47.         if ( strncmp( evhttp_request_uri( req) , “/test/” , 6 ) == 0 ) {
  48.             uid = atoi( 6 +evhttp_request_uri( req) ) ;
  49.         }
  50.  
  51.         if ( uid <= 0 ) {
  52.             evbuffer_add_printf( buf, “User id not found, try /test/123 instead” ) ;
  53.             evhttp_send_reply( req, HTTP_NOTFOUND, “Not Found” , buf) ;
  54.             evbuffer_free( buf) ;
  55.             return ;
  56.         }
  57.  
  58.         if ( uid > MAXUSERS) {
  59.             evbuffer_add_printf( buf, “Max uid allowed is %d” , MAXUSERS) ;
  60.             evhttp_send_reply( req, HTTP_SERVUNAVAIL, “We ran out of numbers” , buf) ;
  61.             evbuffer_free( buf) ;
  62.             return ;
  63.         }
  64.  
  65.         evhttp_send_reply_start( req, HTTP_OK, “OK” ) ;
  66.         // Send welcome chunk:
  67.         evbuffer_add_printf( buf, “Welcome, Url: ‘%s’ Id: %d\n ” , evhttp_request_uri( req) , uid) ;
  68.         evhttp_send_reply_chunk( req, buf) ;
  69.         evbuffer_free( buf) ;
  70.  
  71.         // put reference into global uid->connection table:
  72.         clients[ uid] = req;
  73.         // set close callback
  74.         evhttp_connection_set_closecb( req->evcon, cleanup, &slots[ uid] ) ;
  75. }
  76.  
  77.  
  78. // runs in a thread - the erlang c-node stuff
  79. // expects msgs like {uid, msg} and sends a a ‘msg’ chunk to uid if connected
  80. void cnode_run( )
  81. {
  82.     int fd;                                  /* fd to Erlang node */
  83.     int got;                                 /* Result of receive */
  84.     unsigned char buf[ BUFSIZE] ;              /* Buffer for incoming message */
  85.     ErlMessage emsg;                         /* Incoming message */
  86.  
  87.     ETERM *uid, *msg;
  88.  
  89.     erl_init( NULL , 0 ) ;
  90.  
  91.     if ( erl_connect_init( 1 , “secretcookie” , 0 ) == -1 )
  92.         erl_err_quit( “erl_connect_init” ) ;
  93.  
  94.     if ( ( fd = erl_connect( “httpdmaster@localhost” ) ) < 0 )
  95.         erl_err_quit( “erl_connect” ) ;
  96.  
  97.     fprintf( stderr, “Connected to httpdmaster@localhost\n \r ” ) ;
  98.  
  99.     struct evbuffer *evbuf;
  100.  
  101.     while ( 1 ) {
  102.         got = erl_receive_msg( fd, buf, BUFSIZE, &emsg) ;
  103.         if ( got == ERL_TICK) {
  104.             continue ;
  105.         } else if ( got == ERL_ERROR) {
  106.             fprintf( stderr, “ERL_ERROR from erl_receive_msg.\n ” ) ;
  107.             break ;
  108.         } else {
  109.             if ( emsg.type == ERL_REG_SEND) {
  110.                 // get uid and body data from eg: {123, <<”Hello”>>}
  111.                 uid = erl_element( 1 , emsg.msg ) ;
  112.                 msg = erl_element( 2 , emsg.msg ) ;
  113.                 int userid = ERL_INT_VALUE( uid) ;
  114.                 char *body = ( char *) ERL_BIN_PTR( msg) ;
  115.                 int body_len = ERL_BIN_SIZE( msg) ;
  116.                 // Is this userid connected?
  117.                 if ( clients[ userid] ) {
  118.                     fprintf( stderr, “Sending %d bytes to uid %d\n ” , body_len, userid) ;                
  119.                     evbuf = evbuffer_new( ) ;
  120.                     evbuffer_add( evbuf, ( const void *) body, ( size_t) body_len) ;
  121.                     evhttp_send_reply_chunk( clients[ userid] , evbuf) ;
  122.                     evbuffer_free( evbuf) ;
  123.                 } else {
  124.                     fprintf( stderr, “Discarding %d bytes to uid %d - user not connected\n ” ,
  125.                             body_len, userid) ;                
  126.                     // noop
  127.                 }
  128.                 erl_free_term( emsg.msg ) ;
  129.                 erl_free_term( uid) ;
  130.                 erl_free_term( msg) ;
  131.             }
  132.         }
  133.     }
  134.     // if we got here, erlang connection died.
  135.     // this thread is supposed to run forever
    1.     // TODO - gracefully handle failure / reconnect / etc
    2.     pthread_exit( 0 ) ;
    3. }
    4.  
    5. int main( int argc, char **argv)
    6. {
    7.     // Launch the thread that runs the cnode:
    8.     pthread_attr_t tattr;
    9.     pthread_t helper;
    10.     int status;
    11.     pthread_create( &helper, NULL , cnode_run, NULL ) ;
    12.  
    13.     int i;
    14.     for ( i=0 ;i<=MAXUSERS;i++) slots[ i] =i;
    15.     // Launch libevent httpd:
    16.     struct evhttp *httpd;
    17.     event_init( ) ;
    18.     httpd = evhttp_start( “0.0.0.0″ , 8000 ) ;
    19.     evhttp_set_gencb( httpd, request_handler, NULL ) ;
    20.     event_dispatch( ) ;
    21.     // Not reached, event_dispatch() shouldn’t return
    22.     evhttp_free( httpd) ;
    23.     return 0 ;
    24. }

     

    最大用户数由#defined定义, 类似提及的mochiweb服务器, 他监听在8000端口,期待着用户用/test/<userid>的形式连接,erlang节点的名称被硬编码,他将为接收消息连接 httpdmaster@localhost , erlang cookie, “secretcookie”. 相应的改变这些。

    首先运行连接的erlang节点:
    $ erl -setcookie secretcookie -sname httpdmaster@localhost

    编译运行:
    $ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent
    $ ./httpdcnode

    在erlang shell, 检查你能看到隐藏的c-node:
    erl> nodes(hidden).
    [c1@localhost]

    现在在你的浏览器中连接 http://localhost:8000/test/123 . 你将看到欢迎信息.

    现在回到erlang shell - 向C节点发送一条消息:

    erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}.

    注意我们没有用pid,以此我们用另外一种形式 {procname, node}.我们用 ‘any’ 作为进程名称,它可以被C节点忽略。

    现在你能够通过erlang传递comet消息, 但是所有的http连接都被用libevent C语言编写的erlang节点管理。

    在删除debug打印语句后, 我用同一个上面提到的客户端向http C节点服务器连接1M个用户, 这台机子显示少于10G的内存被使用。服务器进程常驻内存稳定在2G以下:

    因此, 当处理大量连接时相比较webchiweb有很大的节省,对于用libevent服务器进程每个连接的常驻内存低于2KB。所有的都连接后,服务器状态如下:
    Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers

    每个连接的内核/tcp 栈占了另外8KB内存, 看起来有点高,但是我没有基础对象用来与之比较。libevent-cnode服务器需要多点的工作 . 他还不能聪明的处理从同一个用户来的多连接, 没有锁,假如你在一个消息被分发出去后断开连接,这时就有一个竞争条件存在.

    即使这样, 我想这将被普遍化,以这种方式,他允许你用 Erlang 做很多有意思的事, 有一个C+libevent进程充当一个默默无闻的连接池 . 通过更多的包装代码和回调到erlang,你几乎不需要知道这个怎么运行的-C程序作为一个驱动或者一个C节点运行,一个Erlang包装器能给你一个合适的建筑于libevent的api. (看 这个 ,一个Erlang C驱动). 我将来在这上面会尝试更多。

    最后的思考

    我现在有足够的数据判断假如我为Last.fm发布一个大的伸缩性的comet系统我到底需要多少硬件. 即使每个连接40KB有些浪费但不是过分的-内存很便宜,40GB能支持一个百万用户的系统不过分. 10GB更好. 我将完成这个应用,我将在哪个地方构建发布它,人们可以试用它. 顺着这条路我将整理erlang memcached客户端,我正在用且分发他

文章发表于 erlang.



0 条评论

订阅本条RSS 及时了解评论动态.

支持HTML

(必填)