出口插件之redis与es,Socket多进程间的音讯推送示

常用的输入插件:

最常用的三个出口插件:

前言

  • tcp
  • redis
  • redis
  • es

这段时直接到了贰个花销中间转播服务的需求,即支付多少经过http接口传到中间转播服务器,中间转播服务器将开荒多少发送到异构后台(Lua卡塔尔的钦命tcp socket。

一、tcp

一、redis

图片 1

1、用法

1、用法

风流洒脱最早评估的时候认为蛮简单的,就是http server和tcp server间的通讯,不是贰个Event实例就能够减轻的情事管理问题啊?注册二个事件A用于新闻传递,在socket连接时登记唯风度翩翩的ID,然后在http选取到多少时,emit事件A;在监听到事件A时,在tcp server中查找钦点ID对应的socket管理该数量就可以。

 1 input {
 2     tcp {
 3         port => 4560
 4         codec => json_lines
 5         mode => server
 6         host => 0.0.0.0
 7         add_field => {"xxx":"xxx"}
 8         ssl_cert => /xxx
 9         ssl_enable => false
10         ssl_extra_chain_certs => ["xxx"]
11         ssl_key => /xxx
12         ssl_key_passphrase => nil
13         ssl_verify => true
14         tags => ["xxx"]
15         type => xxx
16     }
17 }
 1 output {
 2     redis{
 3         batch => false
 4         batch_events => 50
 5         batch_timeout => 5
 6         codec => plain
 7         congestion_interval => 1
 8         congestion_threshold => 0
 9         data_type => list
10         db => 0
11         host => ["127.0.0.1:6379"]
12         key => xxx
13         password => xxx
14         port => 6379
15         reconnect_interval => 1
16         shuffle_hosts => true
17         timeout => 5
18         workers => 1
19     }
20 }

固然node.js在高并发方面有科学的天性,不过单个tcp server实例的承载本领有限,为制止服务器过载,node.js 单进度的内部存款和储蓄器有上限(私下认可2G卡塔尔,能包容的长连接客商端数十分少。但随着事情的扩充,我们须求思虑多机集群安顿,顾客端能够连绵不断到任焕发青新岁点,并发送音信。怎么样达成多节点的同不常间推送,大家要求树立后生可畏套多节点之间的新闻分发/订阅架构。常用的第三方音讯管理库有 RabbitMQ和Redis等。在这地,笔者用的是Redis的订阅公布服务。

2、常用配置

2、配置项

redis.io有三个相比较成熟的redis音讯中转库socket.io-redis (本地下载卡塔尔。但我们项目中异构后台用到的决不websocket,而是原生的TCP原生的Socket。用原生redis的sub/pubs完成并简单,就手写了。

如上port是必选项,其余是可选项。

以上全部配置项都以可选的,从没必需的。(以下4个石绿配置是最重大的4个布局

redis在该品种中根本起到一个音信分发中央(publish/subscribe)的成效。当http央浼的开支多少发送过来时,则透过redis的publish功能往全部的channel推送新闻,那样具备订阅该channel的socket server就会吸收接纳回调,然后推送到钦点顾客端。在应用层看跟伊夫nt事件音讯的管理差没有多少。

二、redis

  • 批管理类(仅用于data_type为list)
    • batch:设为true,通过发送一条rpush命令,存储一群的多寡
      • 默认为false:1条rpush命令,存储1条数据
      • 设为true之后,1条rpush会发送batch_events条数据或发送batch_timeout秒(决议于哪一个先达到卡塔尔国
    • batch_events:一次rpush多少条
      • 默认50条
    • batch_timeout:三遍rpush最多消耗多少s
      • 默认5s
  • 编码类
    • codec:对出口数据开展codec,制止接收logstash的separate filter
  • 闭塞怜惜(仅用于data_type为list)
    • congestion_interval:每多久进行二遍堵塞检查
      • 默认1s
      • 设为0,表示对每rpush贰个,都举行检测
    • congestion_threshold:list中最多能够存在多少个item数据
      • 暗中同意是0:表示禁止使用窒碍检验
      • 当list中的数据量达到congestion_threshold,会堵塞直到有此外花费者花销list中的数据
      • 作用:防止OOM
  • data_type
    • list:使用rpush
    • channel:使用publish
  • db:使用redis的数据库,暗中认可使用0号
  • host:数组
    • eg.["127.0.0.1:6380", "127.0.0.1"]
    • 能够钦命port,会覆盖全局port
  • port:全局port,默认6379
  • key:list或channel的名字
    • 支撑动态key,比如:logstash-%{type}
  • password:redis密码,暗中同意不行使密码
  • reconnect_interval:退步重连的区间,默以为1s
  • timeout:连接超时,暗中认可5s
const redis = require("redis"),
 redisClient = redis.createClient,
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 监听频道的消息回调
sub.on('message', function(channel, message) {
 switch (channle){
  case PAY_MQ_CHANNEL:
   console.log('notification received:', message);

   // 广播消息到指定socket

   break;
 }
});
// 订阅频道
sub.subscribe(PAY_MQ_CHANNEL);

// 当接收到支付数据时,推送频道消息
pub.publish(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

1、用法

二、es

鉴于redis的sub/pub的channel订阅数有上限,所以提议豆蔻梢头类新闻使用二个channel,三个channel下接受map、set或数组来囤积订阅时的回调函数,在摄取到订阅新闻时遍历施行回调函数。

 1 input {
 2     redis {
 3         data_type => list
 4         port => 6379
 5         codec => json
 6         db => 0
 7         host => 127.0.0.1
 8         add_field => {"xxx":"xxx"}
 9         key => xxx
10         password => xxx
11         threads => 1
12         timeout => 5
13         batch_count => 1
14         tags => ["xxx"]
15         type => xxx
16     }
17 }

1、使用格局

下边是本身封装好的Redis组件(RedisMQProxy.js):

2、配置

 1 output {
 2     elasticsearch {
 3         hosts => ["127.0.0.1:9200"] 
 4         action => index
 5         cacert => /xxx
 6         codec => plain
 7         doc_as_upsert => false
 8         document_id => 1
 9         document_type => xxx
10         flush_size => 500
11         idle_flush_time => 1
12         index => logstash-%{ YYYY.MM.dd}
13         keystore => /xxx
14         keystore_password => xxx
15         manage_template => true
16         max_retries => 3
17         parent => nil
18         password => xxx
19         path => /
20         proxy => xxx
21         retry_max_interval => 2
22         routing => xxx
23         script => xxx
24         script_lang => xxx
25         script_type => inline
26         script_var_name => event
27         scripted_upsert => false
28         sniffing => false
29         sniffing_delay => 5
30         ssl => false
31         ssl_certificate_verification => true
32         template => /xxx
33         template_name => logstash
34         template_overwrite => false
35         timeout => 5
36         truststore => /xxx
37         truststore_password => xxx
38         upsert => xxx
39         user => xxx
40         workers => 1
41     }
42 }
/*
 * redis 订阅/发布
 */
const _ = require('lodash'),
 redis = require("redis"),
 REDIS_CFG = {
  host: '127.0.0.1',
  port: 6379
 },
 sub = redisClient(REDIS_CFG),
 pub = redisClient(REDIS_CFG);

let SubListenerFuns = {}; // channel的回调函数列表

let RedisMQProxy = {

 // 订阅channel
 on(channel, cb, errorCb, once = false) {
  sub.subscribe(channel); // 订阅channel消息

  // 将回调函数存放数组中
  SubListenerFuns[channel] = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  SubListenerFuns[channel].push({
   once, cb, errorCb
  });
 },

 // 监听一次性的channel回调函数
 once(channel, cb, errorCb) {
  this.on(channel, cb, errorCb, true);
 },

 // 发送channel消息
 emit(channel, message) {
  if(!_.isString(message)) {
   message = JSON.stringify(message);
  }
  pub.publish(channel, message);
 },

 // 移除channel上的监听函数
 removeListener(channel, func) {
  let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
  for(let i = 0, l = channelHandlers.length; i < l; i  ) {
   let handler = channelHandlers[i] || {};
   let cb = handler.cb;
   if(func && func == cb) {
    channelHandlers.splice(i, 1);
    return false;
   }
  }
 }
};

RedisMQProxy.SubListeners = SubListenerFuns;

pub.on('error', onError);
sub.on('error', onError);

// 监听redis的订阅消息
sub.on("message", function(channel, message) {
 // 遍历执行channel的回调函数
 try {
  message = JSON.parse(message);
 } catch(e) {}
 broadcastToChannel(channel, message);
});

// 广播消息到指定频道
function broadcastToChannel(channel, message, isError) {
 let channelHandlers = _.isEmpty( SubListenerFuns[channel] ) ? [] : SubListenerFuns[channel];
 for(let i = 0, l = channelHandlers.length; i < l; i  ) {
  let handler = channelHandlers[i] || {};
  let isOnce = handler.once || false;
  let func = handler.cb;
  let errorFunc = handler.errorCb;

  _.isFunction(func) && func(message);
  isError && _.isFunction(errorFunc) && errorFunc(message);

  isOnce && channelHandlers.splice(i, 1); // 移除一次性监听的函数
 }
}

function broadcastToAllChannels(message, isError) {
 for(let channel in SubListenerFuns) {
  broadcastToChannel(channel, message, isError);
 }
}

function onError(err) {
 err = err || {};
 err.msg = err.msg || 'redis sub/pub fail';

 // 通知所有channel执行错误回调函数
 broadcastToAllChannels(err, true);
}

module.exports = RedisMQProxy;

如上选拔全体为可选项。

2、基本配置

在利用时就能够相比好低价地调用了:

  • data_type:
    • list:blpop
    • channel:subscribe
    • pattern_channel:psubscribe
  • type:首要用于过滤

以上配置全部是可选的,未有必得的。以下列出最要紧的多少个。

const RedisMQProxy = require('./RedisMQProxy'),
 PAY_MQ_CHANNEL = 'pay_mq_channel';

// 订阅channel
RedisMQ.on(PAY_MQ_CHANNEL, function(message) {
 console.log('notification received:', message);
 // 广播消息到指定socket
 // ...
});

// 订阅一次性的channel
RedisMQ.once(PAY_MQ_CHANNEL, function(message) {
 // ...
});

// 当接收到支付数据时,推送频道消息
RedisMQ.emit(PAY_MQ_CHANNEL, {id: '01', msg: `hello ${PAY_MQ_CHANNEL}!`});

 

  • hosts:["127.0.0.1:9200","127.0.0.2:9200"]
  • action:指定es的行为,indexdeletecreateupdate
    • 默感到index:index a document(该document就是一个出自于logstash的event卡塔 尔(英语:State of Qatar)
    • delete:通过id删除一个document(须求钦点document_id)
    • create:index a document(如果该document已经在index中存在,则失败)
    • update:通过id更新二个document
  • cacert:验证server合法性的.cer或.pem文件路线
  • codec
  • document_id
  • document_type
  • index:默认值:logstash-%{ YYYY.MM.dd}
    • 福利删除老多少
    • 在语法拆解深入分析的时候,见到 号早先的,会自行以为前边是时刻格式,尝试用时间格式来剖析后续字符串。所以,在此之前管理进度中并不是给自定义的字段起多个 号开端的名字
    • 索引名中不可能有大写字母
    • 突发性也会自定义为:logstash-%{servicename}-%{ YYYY.MM.dd}
  • user:进入es cluster的用户
  • password:进入es cluster的密码
  • timeout:Set the timeout for network operations and requests sent Elasticsearch. If a timeout occurs, the request will be retried.
  • flush_size:暗许500,logstash攒够500条数据再一回性向es发送
  • idle_flush_time:暗中认可1s,假设1s内没攒够500条依然会一遍性将攒的数据发出去给es

最近该类型早已司空眼惯运转了二个多月。由于socket server的多进度间消息推送注重于redis的新闻中间转播,而Redis使用的是单进程,没能足够利用CPU。当职业膨胀的时候,redis就要考虑布满集群了。

总结

以上正是那篇作品的全体内容了,希望本文的剧情对我们的读书或许干活富有自然的参阅学习价值,要是有疑难我们能够留言交换,感谢大家对剧本之家的扶助。

你恐怕感兴趣的小说:

  • node.js使用cluster达成多进度
  • Node.js中child_process实现多进度
  • Nodejs中解决cluster模块的多进度如何分享数据难题
  • Node.js中多进度模块Cluster的介绍与行使
  • node.js中的socket.io的播音音信

本文由亚洲必赢娱乐游戏发布于亚洲必赢,转载请注明出处:出口插件之redis与es,Socket多进程间的音讯推送示

TAG标签: 亚洲必赢
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。