Scalability and load-balancing

RTMFP (Real Time Media Flow Protocol) uses a server end-point to negotiate the P2P connection between clients. All media is sent directly between clients without routing it through the server, which provides extremely cost effective scalable deployments. While the media is transmitted between the clients, one server instance can only negotiate a maximum number of clients before the load becomes too heavy due to CPU/Memory limitations.

To resolve this issue, a full framework is included in MonaServer to enable communicatication between multiple MonaServer instances. The framework detects server connections and disconnections, manages exchanges of data between servers, manages load-balacing by redirecting clients, and has features to synchronise client information for the rendez-vous service and NetGroup RTMFP options. All communication between servers is done in a raw TCP way.

The main idea is simple: by default, each instance is an independent server and shares nothing with others, YOU decide what are the resources to be shared between all the server instances.

This page intends to describe every features of this framework illustrated with some code samples and context usage. Of course, the API page lists all these feature but without code samples or any utilization context.

Finally some piece of script code illustrates how to use it, to know how to create an application server see Server Application page.

Configuration

Firstly to make communicate many instances of MonaServer, you have to configure them. The three following parameters permits the multiple servers mode:

  • host to configurate the public address of the server which will be used in client redirections.
  • servers.port to configure the port to receive incoming server connections.
  • servers.targets to configure the addresses of remote MonaServer instances trying to join.

Here follows an illustration of one configuration with two servers:

_images/TwoServersScalability.png

Warning

Exchange between servers is done in a uncrypted TCP way, so to avoid an attack by the incoming port of B, its servers.port configured should be protected by a firewall to allow just a connection by an other server and nothing else.

Following scripts should be included in root main.lua file to be loaded at start.

A initializes here the connection to B (server.targets configured). A sees B as a target:

-- Server application on A side
function onServerConnection(server)
  if server.isTarget then
    NOTE("Target gotten : ", server.address, " (", server.host, " for clients)")
    -- displays "Target gotten : 192.168.0.2 (www.hostB.com for clients)"
  end
end

B, who has an incoming port configured (1936), accepts the connection of A. B sees A as an initiator:

-- Server application on B side
function onServerConnection(server)
  NOTE(server.isTarget) -- displays "false"
end

Warning

If server A and B configures each other as its target, the two TCP connections will be created, causing confusion in server exchange:

_images/DoubleConnection.png

This configuration system allows to scale an existing system horizontaly without having to restart server already running. Indeed, the first server started can configure its incoming server port (servers.port) and no target, and a new server can come to extend the system in putting the address of the first server in its servers.targets configuration.

Of course, complex configurations are possible, with multiple servers (and properties individual by server, see Configurations):

;MonaServer.ini
host = www.myhost.com:1935
[servers]
targets = 192.168.0.2:1936?type=master;192.168.0.3:1936
function onServerConnection(server)
  if server.type=="master" then -- true here just for 192.168.0.2:1936 server
    NOTE("Master server connected")
  end
end
function onServerDisconnection(server)
  if server.type=="master" then -- true here just for 192.168.0.2:1936 server
    NOTE("Master server disconnected")
  end
end

Warning

The server applications which have the same path (www/myGame on server A and on server B) are synchronized but reloaded always just on connection client. It means that if you edit the file www/myGame/main.lua on the server A, it rebuilds the server A version on new connection client, and tries to rebuild the server B version too (of course reloading is effective just if the server B version has changed too). But if you edit the server B version and that clients are always connected by the server A intermediate, you have to edit the server A version too to get a refresh of the server B application on connection client.

It is also possible to reject a server adding an error in the onServerConnection function :

function onServerConnection(server)
  -- Reject all connections not comming from localhost
  if server.address is not "127.0.0.1" then
    error(server.address, " is trying to connect to the server => rejected")
  end
end

Exchange data and resources

To exchange data between servers you have to call the server:send method on sender side (see Server object description) and you have to define RPC server functions as a member of server object on the receiver side:

function onServerConnection(server)
  -- RPC function declaration, to receive data from one other server
  function server:onHello(name)
    self.name = name
  end
  -- send my name to the incoming server (it will receive it on its "onHello" method)
  server:send("onHello","MonaServer A")
end

-- now you can find the name of each server everywhere
for index,server in mona.servers:ipairs() do
  NOTE("Server '"..server.name.."' at address "..server.address)
end

Warning

self.name = name in the function body of onHello creates on the server object a name value. Beware with this kind of thing on server object, it’s shared with all other Server Application. If one other server application attachs too a name value to this server object, it will overload the previous assignment. A solution can be to prefix the property by the name of the current application.

The main goal of this exchange mechanism is to share resource wanted between all the server instances. For example, if you use Mona to stream (by server bypass configuration, no P2P) to many subscribers, usually there are a small number of publishers and a very important number of subscribers. The server can support the publisher load, but could be saturated by the important number of listeners. One solution in this model case is to scale horizontaly the system to share the subscribers load.

_images/ThreeServersExchange.png

Here we have a configuration with three servers, but many others could be added dynamically. The load-balacing system can be managed by a DNS way, but we have to share the publications between all three (or more) servers, otherwise one subscriber could not find one publication. Below following a complete Server Application to share publications between all the servers.

-- following server (horizontal scaling)
_nextServer = nil

-- number of subscribers (listeners) for this server
_subscribers = 0

function onConnection(client,...)
  INFO("Connection of a new client on ", mona.configs["host"])

  function client:onPublish(publication)
    -- informs the following server about this publication
    if _nextServer then _nextServer:send("publish", publication.name) end

    function publication:onVideo(time, packet)
      if not _nextServer then return end
      -- forward the video packet to the following server
      _nextServer:send("video", publication.name, time, packet)
    end
    function publication:onAudio(time, packet)
      if not _nextServer then return end
      -- forward the audio packet to the following server
      _nextServer:send("audio", publication.name, time, packet)
    end
    function publication:onData(name, packet)
      INFO("onData : ", name, " - ", packet)
      if not _nextServer then return end
      -- forward the data packet to the following server
      _nextServer:send("data", publication.name, name, packet)
    end
  end

  function client:onUnpublish(publication)
    -- informs the following server about this unpublication
    if _nextServer then _nextServer:send("unpublish",publication.name) end
  end

  function client:onSubscribe(listener)
    -- if a following server exist, and if this server has more than 400 subscribers
    -- redirect the client to the following server:
    -- I send an error with the redirection server address in its description
    INFO("Subscription of client ", client.address, " (_subscribers=", _subscribers, ")")
    if _nextServer and _subscribers>=400 then error(_nextServer.host) end
    _subscribers = _subscribers + 1
  end

  function client:onUnsubscribe(listener)
    _subscribers = _subscribers - 1
  end
end

function onServerConnection(server)
  if server.isTarget then
    -- incoming server is a following server!
    if _nextServer then error("following server already connected") end
    _nextServer = server
    -- informs the following server about my publications
    for id,publication in pairs(mona.publications) do
      _nextServer:send("publish",publication.name)
    end
  else
    -- incoming server is a previous server, we have to create RPC function to receive
    -- its publication informations
    server.publications = {}
    function server:publish(name)
      -- publication creation
      self.publications[name] = mona:publish(name)
    end
    function server:unpublish(name)
      -- publication suppression
      local publication = self.publications[name]
      if publication then publication:close() end
      self.publications[name] = nil
    end
    function server:video(name, time, packet)
      local publication = self.publications[name]
      -- give the video packet to our publication copy
      if publication then publication:pushVideo(time, packet) end
    end
    function server:audio(name, time, packet)
      local publication = self.publications[name]
      -- give the audio packet to our publication copy
      if publication then publication:pushAudio(time, packet) end
    end
    function server:data(name, dataname, packet)
      local publication = self.publications[name]
      -- give the data packet to our publication copy
      if publication then publication:pushData(packet) end
    end
  end
end

function onServerDisconnection(server)
  if server.isTarget then
    -- disconnected server was a following server!
    _nextServer = nil
    return
  end
  -- disconnected server was a previous server, close its publications
  for id,publication in pairs(server.publications) do
    publication:close()
  end
end

The line if _nextServer and _subscribers>=400 then error(_nextServer.host) end requires a specific client code to work, to redirect as wanted the new subscriber to the new server :

function onStatusEvent(event:NetStatusEvent):void {
  switch(event.info.code) {
    case "NetStream.Play.Failed":
      var error:Array = event.info.description.split(" ");
      if (error.length > 0) {
        var host:String = "rtmfp://" + error[error.length-1];
        _netConnection.close();
        _netConnection.connect(host);
      }
      break;
  }
}

Load balancing and rendezvous service

In a load-balancing solution, usually we opt for hardware solution with a DNS which returns an address ip rotated on a list of addresses. You can realize it in a software way using the onHandshake(address,path,properties,attempts) event:

-- index incremented to redirect client equally to each server
index=0
function onHandshake(address,path,properties,attempts)
  index=index+1
  if index > mona.servers.count then index=1 end -- not exceed the number of server available
  return mona.servers(index) -- load-balacing system!
end

Here the server doesn’t accept any connection client, it redirects the client in handshake performing. There is no real benefits comparing with a hardware solution. An other possibility is to return many server addresses to benefit of parallel connection behavior of RTMFP protocol.

function onHandshake(address,path,properties,attempts)
  return mona.servers
end

Indeed, the client will receive multiple server addresses, and in this case, RTMFP starts multiple connection attempt in parallel, and keep only the faster to answer. It’s an other way of load-balacing system: the more faster wins.

About the P2P rendezvous service of Mona, in a multiple servers way, if the peerA connected to MonaServerA requests a connection to the peerB connected to MonaServerB, of course MonaServerA will be unable to return information about peerB. We have to use the onRendezVousUnknown(protocol, peerId) event:

function onRendezVousUnknown(protocol, peerId)
  return mona.servers -- redirect to all the connected servers
end

With the above code addition, you can redirect a rendezvous request which fails to other servers.

But it’s always missing a solution to synchronize member of groups in NetGroup usage case. Indeed, a groupA can exists on serverA and contains peerA, and the same groupA can exists on serverB too and contains peerB. peerB and peerA will never meet them. To solve it, you have to use groups:join method (see Groups object description for a complete description of this method). The idea is simple: you have to share every group inclusion informations between all servers. The following server application code realizes this sharing job:

function onRendezVousUnknown(protocol, peerId)
  return mona.servers -- redirect to all the connected servers
end

function onConnection(client)
  function client:onJoinGroup(group)
    -- inform other servers of this joining operation
    mona.servers:broadcast("join",group.rawId,client.rawId)
  end

  function client:onUnjoinGroup(group)
    -- inform other servers of this unjoining operation
    mona.servers:broadcast("unjoin",group.rawId,client.rawId)
  end
end

function onServerConnection(server)
  -- inform this new incoming server of my group/client relations existing
  for id,group in pairs(mona.groups) do
    for i,client in ipairs(mona.groups) do
      server:send("join",group.rawId,client.rawId)
    end
  end

  server.groups = {}
  -- RPC server functions to receive joining/unjoining operation
  function server:join(groupId,clientId)
    -- creation of a virtual member for this group
    local member = mona:joinGroup(clientId,groupId)
    if not member then return end -- join operation has failed
    -- We have to attach this member object to its server
    -- to avoid its destruction by the LUA garbage collector
    local group = self.groups[groupId]
    if not group then group = {size=0}; self.groups[groupId] = group end
    group.size = group.size + 1
    group[clientId] = member
  end
  function server:unjoin(groupId,clientId)
    -- suppression of a possible virtual member of group
    if not group then return end
    local member = group[clientId]
    if member then
      member:release() -- detach of its group
      group[clientId] = nil
      group.size = group.size - 1
    end
    -- erase the group object if it's empty now
    if group.size==0 then self.groups[groupId]=nil end
  end
end

function onServerDisconnection(server)
  -- suppression of possible virtual members attached to this server
  for id,group in pairs(server.groups) do
    for id,member in pairs(group) do
      if id ~= "size" then member:release() end
    end
  end
end