NodeJs回顾

NodeJS

模块与文件

require 的加载机制

模块的分类

系统模块

  • C/C++模块,也叫build-in内建模块,一般用于native模块嗲用,在require出去
  • native模块,在开发中使用的Node.js 的 http/buffer/fs等,底层也是调用的内建模块(C/C++)

第三方模块:

非Node.js自带的模块被称为第三方模块,会分成路径形式的文件模块(以.../开头的)和自定义的模块(比如express/koa/moment.js等)

在 Node.JS 中模块加载一般会经历3个步骤:路径分析、文件定位、编译执行

按照模块的分类,按照以下的顺序进行优先加载:

  • 系统缓存:模块被执行之后会进行缓存,首先是先进行缓存加载,判断换粗中是否有值。
  • 系统模块:也就是原生模块,优先级次于系统缓存,部分核心模块已经被编译成二进制,省略了路径分析、文件定位,直接加载在内存中,系统模块定义在 Node.js 源码的lib目录下。
  • 文件模块:优先加载 .../开头的,如果文件没有加上拓展名,会依次按照.js.json.node尝试进行拓展名补足。(在尝试过程中也是以同步阻塞模式来判断文件是否存在的,从性能优化角度来看,.jsonnode最好还是加上文件的拓展名)
  • 目录作为模块:文件模块加载过程中没有找到,但发现这是一个目录,这个适合就会把这个目录当做一个包来处理,Node采用了commonjs规范,会先在项目的根目录查找package.json文件,取出文件中定义的main属性描述的入口文件进行加载,也没有加载到,则会抛出默认错误:Error:Cannot find module ‘lib/hello.js’
  • node_modules目录加载:对于系统模块,路径文件模块都找不到,Nodejs会从当前模块的父目录进行查找,直到系统的根目录

module.exports 与 exports 的区别

exports相当于 module.exports 的快捷方式:

1
const exports = module.exports

但是不能改变 exports 的指向,我们可以通过 exports.test = 'a',这样来导出一个对象,但是不能通过下面的例子直接赋值,这样会改变 exports 的指向

1
2
3
4
5
6
7
8
9
10
11
// 错误的写法,将会得到 undefined
exports = {
'a':1,
'b':2
}

// 正确的写法
modules.exports ={
'a':1,
'b':2
}

模块循环引用问题

1
2
3
4
5
6
7
8
9
10
11
12
13
// a.js
console.log('a模块start');
exports.test = 1;
undeclaredVariable = 'a模块为声明变量'
const b = require('./b');
console.log('a模块加载完毕:b.test值',b.test);

// b.js
console.log('b模块start');
exports.test = 2;
const a = require('./a');
console.log('undeclaredVariable:',undeclaredVariable)
console.log('b模块加载完毕:a.test值:',a.test)

执行 node a.js,结果:

1
2
3
4
5
a模块start
b模块start
undeclaredVariable:a模块未声明变量
b模块加载完毕:a.test值:1
a模块加载完毕:b.test值:2

启动a.js会加载b.js,那么在b.js中又加载到了a.js,但是此时a.js模块hi阿咩有执行完,返回的是一个a.js模块的exports对象未完成的副本给到b.js模块(因此不会陷入死循环),然后b.js完成加载之后将exports 对象退供给a.js模块

a 模块中的 undeclaredVariable 变量在 b.js 是否会打印?

undeclaredVariable 是一个未声明的变量,也就是一个挂在全局的变量,那么在其他地方是可以拿得到的

假设有 a.js、b.js 两个模块相互引用,会有什么问题,会不会陷入死循环?

不会陷入死循环

Buffer

Buffer与Cache的区别

缓冲(Buffer)

缓冲是用于处理二进制流数据,将数据存储起来,它是临时性的,对于流式数据来说,会采用缓冲区将数据临时存储起来,等缓冲到一定大小的时候存入硬盘中。视频播放器就是一个经典例子。

缓存(Cache)

缓存可以看做一个中间层,可以是永久性的将热点数据进行缓存,使得访问速度更快。例如我们通过对 memory、Redis等将数据从硬盘或者第三方接口中请求过来进行缓存,目的就是将数据存于内存的缓存区中,这样对同一个资源进行访问,速度会更快,也就是性能优化的一个重要点。

线程与进程

什么是进程(Process)和线程(Thread),之间的区别

进程

进程是计算中程序关于数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器。启动一个服务,运行一个实例,就是开一个服务进程。多进程就是进程的复制(fork),fork出来的每个进程都有自己的独立空间地址、数据栈,一个进程无法访问另外一个进程里定义的变量、数据结构,只有建立了IPC通信,进程之间才可以数据共享。

线程

线程是操作系统能够进行运算调度的最小单位,线程是隶属进程的,被包含与进程之中的。一个线程只能隶属一个进程,但是一个进程是可以拥有多个线程的。

同一个代码,可以根据系统CPU核心数启动多个进程,每个进程都有属于自己的独立运行空间,进程之间是不互相影响的。同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述和信号处理等。但 同一进程中的多个线程有各自的调用栈,自己的寄存器环境,自己的线程本地存储。

什么是孤儿进程

父进程创建子进程之后,父进程退出了,但是父进程对应的一个或者多个子进程还在运行,这些子进程会被系统的init进程收养,对应的程序ppid为1,这就是孤儿进程。

创建多进程时,代码里有app.listen(port)在进行fork时,为什么没有报端口被占用

端口被占用的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// master.js
const fork = require('child_process').fork;
const cups = require('os').cups();

for(let i=0;i<cups.length;i++){
const worker = fork('worker.js')
console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}

// worker.js
const http = require('http');
http.createServer((req,res)=>{
res.send('I am worker,pid:'+process.pid+', ppid: '+process.pid)
}).listen(3000);

// 控制台执行node master.js只有一个worker 可以监听 3000 端口,其余会抛出 Error:listen EADDRINUSE :::3000错误

多进程模式可通过句柄传递不会有端口占用的问题。

当父子进程之间建立IPC通道之后,通过子进程对象的send方法发送消息,第二个参数sendHandle就是句柄,可以是TCP套接字,TCP服务器、UDP套接字等,为了解决上面多进程端口占用问题,将主进程的socket传递到子进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// master.js
const fork = require('child_process').fork;
const cups = require('os').cpus();
const server = require('net').createServer();
server.listen(3000);
process.title = 'node-master';

for(let i=0;i<cpus.length;i++){
const worker = fork('worker.js')
worker.send('server',server);
console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);

if(i+1 === cpus.length){
console.log('serve close');
server.close(); // 关闭服务器监听,交由子进程处理
}
}

// worker.js
const http = require('http');
const server = http.createServer((req,res)=>{
res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})
let worker;
process.title = 'node-worker';
process.on('message',(message,sendHandle)=>{
if(message === 'server'){
worker = sendHandle;
worker.on('connection',(socket)=>{
server.emit('connection',socket)
})
}
})

什么是IPC通信,如何建立,什么场景下会使用

IPC(inter-process communication),即进程间通信技术,由于每个进程创建之后都有自己的独立地址空间,实现IPC的目的就是进程之间的资源共享访问,实现IPC的方式有多种:管道、消息队列、信号量、Domain Socket,Nodejs通过pipe来实现。

未使用IPC的情况:

1
2
3
4
5
6
7
8
9
// pipe.js
const spawn = require('child_process').spawn;
const child = spawn('node',['worker.js']);
console.log(process.pid,child.pid);

// worker.js
console.log('I am worker,PID:',process.pid)

// 执行 node pipe.js,输出主线程的id,子线程的id,但是子线程worker.js的信息没有在控制台打印,原因是新创建的子进程有自己的 stdio流

创建一个父进程和子进程之间传递消息的IPC通道实现输出信息

1
2
3
4
5
6
7
8
9
// 修改pipe.js让子进程的stdio和当前进程的stdio之间建立管道链接,还可以通过spawn方法的stdio选项建立IPC机制
// pipe.js
const spawn = require('child_process').spawn;
const child = spawn('node',['worker.js']);
child.stdout.pipe(process.stdout);
console.log(process.pid,child.pid);

// 父进程与子进程的通信
// 父进程在创建子进程之前会先去创建IPC通道并一直监听该通道,之后开始创建子进程并通过环境变量(NODE_CHANNEL_FD)的方式将IPC频道的文件描述符传递给子进程,子进程启动时根据传递的文件描述符去链接IPC通道,从而建立父子进程之间的通信机制。

Node.js是单线程还是多线程,为什么会单线程

javascript是单线程,在服务端运行环境的nodejs不是单线程。

浏览器环境中对于DOM的操作是单线程的,避免DOM渲染冲突,在浏览器中UI渲染线程和JS执行引擎是互斥的,一方在执行式都会导致另一方被挂起,这是由JS引擎所决定的。

关于守护进程,是什么,为什么,怎么编写?

守护进程运行在后台不受终端影响。

创建步骤

  1. 创建子进程
  2. 在子进程中创建新会话(调用系统函数setsid)
  3. 改变子进程工作目录(如:’/‘或者’/usr/‘等)
  4. 父进程终止

编写demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// index.js文件的处理逻辑使用 spawn创建子进程完成第一步,设置options.detached为true可以使得子进程在父进程退出后继续运行(系统层会调用setsid方法),这是第二步。options.cwd指定当前子进程工作目录不做设置默认继承当前工作目录,这是第三步。运行daemon.unref()退出父进程,第四步。
// index.js
const spawn = require('child_process').spawn;

function startDaemon(){
const daemon = spawn('node',['daemon.js'],{
cwd:'/usr',
datached:true,
stdio:'ignore',
})
console.log('守护进程开启 父进程 pid: %s, 守护进程 pid: %s', process.pid, daemon.pid);
daemon.unref();
}
startDaemon();

// daemon.js文件哩逻辑开启一个定时器每10秒运行一次,使得这个资源不会退出,同时写入日志到子进程当前的工作目录下
// /usr/daemon.js
const fs = require('fs');
const {Console} = require('console');

const logger = new Console(fs.createWriteStream('./stdout.log'),fs.createWriteStream('./stderr.log'));

setInterval(function(){
logger.log('daemon pid:',process.pid,'ppid: ',process.ppid)
},1000*10)

实际工作中守护进程很多,例如PM2,Egg-Cluster等,实际工作上对于守护进程的健壮性要求还是很高的,例如:进程的异常监听,工作进程管理调度,进程挂掉之后重启等等。

实现一个简单的命令行交互程序

采用子进程 child_process的spawn方法:

1
2
3
const spawn = require('child_process').spawn;
const child = spawn('echo',['简单的命令行交互']);
child.stdout.pipe(process.stdout) // 将子进程的输出作为当前进程的输入,打印在控制台

进程的当前工作目录是什么,有什么用

进程的当前工作目录可以通过process.cwd()命令获取,默认为当前启动的目录,如果是创建子进程则继承于父进程的目录,可以通过process.chdir()命令重置,例如通过spawn命令创建的子进程可以指定cwd选项设置子进程的工作目录。

有什么用,例如fs读取文件,如果设置为相对路径则是相当于当前进程启动的目录进行查找,所以,启动目的设置有误的情况下将无法得到正确的结果。还有一种情况程序里引用第三方模块也是根据当前进程启动的目录来进行查找的

多进程或者多个Web服务之间的状态共享问题

多进程模式下各个进程之间是相互独立的,例如用户登录之后seesion的保存,如果保存在服务进程里,那么如果我有4个工作进程,每个进程都要保存一份这是没有必要的。假设服务重启了数据也会丢失。多个Web服务也是一样的,还会出现在A机器创建了Session,当负载均衡分到B机器上之后还需要再创建一份,一般的做法是通过Redis或者数据库来做数据共享

什么是僵尸进程

使用fork可以创建子进程,正常情况进程退出,内核要释放进程所占用的资源:打开的文件、占用的内存等,但是进程的PID、退出状态、运行时间等会进行保留,知道父进程调用wait/waitpid来获取子进程的状态信息时,这些资源才会释放。

如果子进程退出之后,父进程没有调用wait/waitpid来获取子进程的状态,那么保留的进程号将会一直被占用,且占用系统资源,称为僵死或僵尸进程。

元凶是其父进程,我们把元凶kill掉之后,僵尸进程会变为孤儿进程被系统的 init 进程pid=1的进程所收养,init进程会对这些孤儿进程进行管理(调用wait/waitpid)释放其占用资源。

Console

console是异步还是同步的

console既不是总是同步的,也不总是异步的,是否为同步取决于链接是什么流以及操作系统是Window还是Posix

同步写将会阻塞实践循环直到写完成。

  • 文件(Files):Windows和POSIX平台都是同步
  • 终端(TTYS):Windows平台下同步,在POSIX平台下异步
  • 管道(Pipes):Windows平台下同步,POSIX平台下异步

如何实现一个console.log

可利用 process.stdout将输入流数据输出到输出流(即输出到终端)

1
process.stdout.write('xxx'+'\n')

为什么console.log()执行完就退出

一旦产生事件循环,就会长产生一个While(true)的死循环,例如定时器,console.log则没有产生watch/handlers,在事件循环一次就退出了。

Nodejs进程退出会等待异步处理完成,常见的运维过程中会碰到需要进程优雅退出的场景,Nodejs自然退出是最好的,process.exit是比较粗暴的。

常见的异步请求:

  • http请求,数据库请求等IO请求操作
  • net.Server.listen()或者http.Server.listen等端口监听
  • fs.write()类型的文件IO操作
  • console.log()输出日志
  • setTimeout()/setInterval等定时器操作
  • process.send()等异步请求发送

Net模块

OSI七层模型 TCP/IP五层模型 描述
应用层 构建于传输层之上常用的HTTP、FTP文件传输协议、SMTP邮件传输协议等
表示层 构建于传输层之上常用的HTTP、FTP文件传输协议、SMTP邮件传输协议等
会话层 构建于传输层之上常用的HTTP、FTP文件传输协议、SMTP邮件传输协议等
传输层 传输层 向用户提供可靠的端到端服务TCP、UDP
网络层 网络层 IPV4、IPV6
数据链路层 数据链路层 设备驱动和硬件
物理层 物理层 设备驱动和硬件

什么是TCP协议,什么时候会选择TCP协议

IP协议是无连接通信协议,IP协议不会占用两个设备之间通信的线路,IP实际上主要负责将每个数据包路由至目的地,但是IP协议没有能确保数据包是否到达,传过去的数据是否按照顺序排列,所以IP数据包是不可靠的。而解决数据不可靠的问题就是由TCP协议来完成。

TCP(Transmission Control Protocol)是可靠的传输控制协议,三个特点:

  • 面向链接:需要对方主机在线,并建立链接
  • 面向字节流:发送多少字节自己说了算,每次选出一段字节发送的时候,都会带上一个序号,这个序号就是发送的这段字节中编号最小的字节的编号。
  • 可靠:保证数据有序的到达对方主机,每发送一个数据就会期待收到对方的回复,在指定时间内收到了ACK回复,就确认数据到达,如果超过一定的时间没有收到对方的回复,就认为对方没有收到,再重新发送一次。

TCP报文

源端口(16) 目的端口(16)
TCP序号(32) TCP序号(32)
捎带的确认(32) 捎带的确认(32)
首部长度(4)保留(6)Flag(6:URG、ACK、PSH、RST、SYN、FIN) 窗口尺寸(16)
TCP校验和(16) 紧急指针(16)
数据包内容 数据包内容

6个标志位:

  • URG,紧急指针标志,当为1时表示紧急指针有效,为0时则忽略紧急指针
  • ACK,确认序号标志,为1表示确认有效,为0表示报文不含有确认信息,确认号无误
  • PSH,push标志,当为1时就是让接收方收到该TCP报文的时候不进入缓冲区排队而是快速发送给应用程序
  • RST,重置连接标志,当连接出现错误的时可以重置,或者用于拒绝非法的报文段和连接请求
  • SYN,同步序号,用于建立连接过程
  • FIN,finish标志,用于释放连接

3次握手协议:

  1. 第一次握手,当客户端需要去建立连接时,客户端就会发送SYN包(seq=x)到服务器,然后客户端进入SYN_SEND的状态,代表已经发SYN包过去,并且在等待服务器确认。此时,ACK=0,SYN=1。
  2. 第二次握手,服务器收到SYN包,会进行确认,由上面的标志知道SYN是表示同步序号,这时候会使得 确认号=序号+1,即ack等于x+1,然后服务器也会像客户端发送一个SYN包(seq=y),也就是服务器会发送SYN+ACK包,来表示确认到了客户端的一次握手并且二次握手建立,此时服务器进入SYN_RECV状态。此时,ACK=1,SYN=1。
  3. 第三次握手,客户端收到服务器的SYN+ACK包,然后就会向服务器发送确认包ACK(ack=y+1)和SYN(seq=x+1),等到这个包发送完毕之后客户端和服务器就会进入ESTABLISHED状态,完成三次握手,就可以在服务器与客户端之间传输数据了。

SYN是同步序号,当SYN=1而ACK=0时表明这是一个连接请求报文,对方若同意连接,那应在报文中使SYN=1和ACK=1,因此SYN置1表示这是一个连接请求或者连接接受报文。而ACK状态是用来确认是否同意连接。也就是传了SYN,证明发送方到接收方的通道没有问题,但是接收方到发送方的通道还需要ACK信号来验证

当在传送完数据之后,客户端与服务器端之间有四次握手协议:

  1. 第一次握手:客户端发送一个FIN和序号过去(seq=u)用来表示客户端和服务端之间有关闭的请求,同时关闭客户端的数据传送,客户端就进入FIN_WAIT_1的状态
  2. 第二次握手:服务端收到FIN=1的标志位,就会发送一个ACK标志位表示确认,然后确认序号就变成了收到的序号+1,即ack=u+1(FIN和SYN在这点相同,但是作用不一样)这时候服务端进入CLOSE_WAIT状态,这是一个半关闭状态。只能服务端给客户端发送数据而客户端不能给服务端发送数据
  3. 第三次握手:这次握手还是服务端发起的。这是服务端在传完最后的数据(没有就不传)就会发送一个FIN=1和ACK=1,且序号seq会改变(没有传数据则不变),而ack不变,这时候服务端就会进入LAST_ACK状态,表示最后再确认一次。
  4. 第四次握手:客户端在接收到FIN之后,就会进入TIME_WAIT状态,接着发送一个ACK和seq=u+1,ack=w+1给服务端,这时候服务端就会进入CLOSED状态。而客户端进入TIME_WAIT状态的时候必须要等待2MSL的时间才会关闭

TIME_WAIT状态的作用?(MSL:网络中数据报文存在的最大时间)

  1. TIME_WAIT状态可以确保有足够的时间让对方接收到ACK包,如果ACK没有到达,在传输过程丢失了或者一些其他原因,这样就可以让客户端重发ACK包,如果客户端直接关闭了,那么就有可能导致服务端在一些情况下没有接受到ACK包而无法与客户端断开连接。这样客户端发送ACK包到服务端,服务端请求重发,一来一回就刚好是2MSL
  2. 保证迟来的TCP报文段有足够的时间被识别并丢弃,linux中一个TCPort不能打开两次或者两次以上。当client处于time_wait状态时无法使用此port建立新连接,假设不存在time_wait状态,新连接可能会受到旧连接的数据

TCP粘包是什么,该怎么办

客户端(发送一端)在发送之前会将短时间有多个发送的数据块缓冲到一起(发送缓冲区),形成了一个大的数据块一并发送,同样接收端也有一个接收缓冲区,收到的数据先存放在接收端缓冲区,然后程序从这里读取部分数据进行消费,这样做也是为了减少I/O消耗达到性能优化。

数据达到缓冲区什么时间开发发送这个取决于TCP拥塞控制,是任何时刻内确定能被发送出去的字节数的控制因素之一,是阻止发送方至接收方之间的链路变得拥塞的手段

TCP粘包解决方案:

  1. 延迟发送:设置延迟发送,sleep休眠一段时间。简单但是传输效率大大降低,只适用于交互频率低的情况

  2. 关闭nagle算法。nagle算法是一种改善网络传输效率的算法,避免网络中充斥着大量小的数据块,它所期望的是尽可能发送大的数据块,因此在每次请求一个数据块给TCP发送时,TCP并不会立即执行发送,而是等待一小段时间进行发送。

    当网络中充斥着大量小的数据块时,Nagle算法能将小的数据块集合起来一起发送减少了网络拥堵,但并不是所有场景都需要这样。例如,REPL终端交互,当用户输入单个字符以获取响应,所以在nodejs中可以设置 socket.setNoDelay方法来关闭Nagle算法。const server = net.createServer(); server.on('connection',socket=>{socket.setNoDelay(true)})

  3. 封包/拆包。使用长度编码的方式,通信双方约定好格式,将消息分为定长的消息头(Header)和不定长的消息体(Body),在解析时读取消息头获取到内容的占用的长度,之后读取到的消息体内容字节数等于字节头的字节数时,认为它是一个完整的包。

消息头序号(Header) 消息体长度(Header) 消息体(Body)
SerialNumber bodyLength body
2字节 2字节 N字节

Buffer的几个api:

  • Buffer.alloc(size[,fill[,encoding]]),初始化一个size大小的Buffer空间,默认填充0,也可以指定fill进行自动以填充
  • Buffer.writeInt16BE(value[,offset]),value为要写入的Buffer值,offset为偏移量从哪个位置开始写入
  • Buffer.writeInt32BE(value[,offset]),value为要写入的Buffer值,不同的是writeInt16BE表示高位优先写入一个16位整型,这个是32位
  • Buffer.readInt16BE([offset]),高位优先读取16位整型,offset为读取之前要跳过的字节数
  • Buffer.readInt32BE([offset]),高位优先读取32位整型,offset为读取之前要跳过的字节数

编码/解码的实现

TCP顶层是基于二进制数据,应用层通常是易于表达的字符串、数字等,需要先将数据通过Buffer转换为二进制,取出的时候同样需要解码操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// transcoder.js
class Transcoder {
constructor() {
this.packageHeaderLen = 4; // 包头长度
this.serialNumber = 0; // 定义包序号
this.packageSerialNumberLen = 2; // 包序列号所占用的字节
}
/**
* 编码
* @param {Object} data Buffer 对象数据
* @param {Int} serialNumber 包序号,客户端编码时自动生成,服务器解码之后在编码时需要传入解码的包序号
*/

encode(data, serialNumber) {
const body = Buffer.from(data);
const header = Buffer.alloc(this.packageHeaderLen);
header.writeInt16BE(serialNumber || this.serialNumber);
header.writeInt16BE(body.length, this.packageSerialNumberLen); // 跳过包序号的前两位

if (serialNumber === undefined) {
this.serialNumber++;
}
return Buffer.concat([header, body])
}

/**
* 解码
* @param {Object} buffer
*/

decode(buffer) {
const header = buffer.slice(0, this.packageHeader); // 获取包头
const body = buffer.slice(this.packageHeaderLen); // 获取包尾部

return {
serialNumber: header.readInt16BE(),
bodyLength: header.readInt16BE(this.packageSerialNumberLen), // 因为编码阶段跳过两位,所以解码也需要跳过
body: body.toString(),
}
}

/**
* 获取包长度两种情况
* 1. 如果当前buffer长度数据小于包头,肯定不是一个完整的数据包,因此直接返回0不做处理(可能数据还没有接收完)
* 2. 否则返回这个完整的数据包长度
* @param {*} buffer
*/

getPackageLength(buffer) {
if (buffer.length < this.packageHeaderLen) {
return 0;
}
return this.packageHeaderLen + buffer.readInt16BE(this.packageSerialNumberLen)
}
}


module.exports = Transcoder;

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const net = require('net');
const Transcoder = require('./transcoder');

const transcoder = new Transcoder();

const client = net.createConnection({
host: '127.0.0.1',
port: 3000
})

let overageBuffer = null; //上一次Buffer剩下的数据


client.on('data', buffer => {
if (overageBuffer) {
buffer = Buffer.concat([overageBuffer, buffer])
}

let packageLength = 0;

// eslint-disable-next-line no-cond-assign
while (packageLength = transcoder.getPackageLength(buffer)) {
const packageData = buffer.slice(0, packageLength); // 取出整个数据包
buffer = buffer.slice(packageLength); // 删除已经取出的数据包,这里采用的方法是把缓冲区(buffer)已取出的包给截掉
const result = transcoder.decode(packageData); // 解码
console.log(result)
}

overageBuffer = buffer; // 记录剩余不完整的包
}).on('error', err => { // 监听一个未开启的端口就会报 ECONNREFUSED错误
console.log(`服务器异常: ${err}`)
}).on('close', err => {
console.log(`客户链接断开!, ${err}`)
})

client.write(transcoder.encode('Nodejs 技术栈'))

const arr = [
'1 JavaScript ',
'2 TypeScript ',
'3 Python ',
'4 Java ',
'5 C ',
'6 PHP ',
'7 ASP.NET ',
];

setTimeout(() => {
for (let i = 0; i < arr.length; i++) {
console.log(arr[i])
client.write(transcoder.encode(arr[i]))
}
}, 1000)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const net = require('net');
const Transcoder = require('./transcoder');
const transcoder = new Transcoder();
const HOST = '127.0.0.1';
const PORT = 3000;
let overageBuffer = null; // 上一次善剩余数据

// 创建一个TCP服务实例
const server = net.createServer();

// 监听端口
server.listen(PORT, HOST)

server.on('listening', () => {
console.log(`服务已经开启在${HOST}:${PORT}`)
}).on('connection', socket => {
// data事件就是读取数据
socket.on('data', buffer => {
if (overageBuffer) {
buffer = Buffer.concat([overageBuffer, buffer])
}
let packageLength = 0;
// eslint-disable-next-line no-cond-assign
while (packageLength = transcoder.getPackageLength(buffer)) {
const packageData = buffer.slice(0, packageLength); //取出整个数据包
buffer = buffer.slice(packageLength); // 删除取出的数据包,这里采用的方法是把缓冲区buffer已取出的包截掉
const result = transcoder.decode(packageData); // 解码
console.log(result);
socket.write(transcoder.encode(result.body, result.serialNumber))
}
overageBuffer = buffer; // 记录不完整的包
}).on('end', () => {
console.log('socket end')
}).on('error', error => {
console.log('socket error', error)
})
}).on('close', () => {
console.log('Server Close!')
}).on('error', err => {
if (err.code === 'EADDRINUSE') {
console.log('地址正被使用,重试中......')

setTimeout(() => {
server.close();
server.listen(PORT.HOST)
}, 1000)
} else {
console.log(`服务器异常: ${err}`)
}

})

DNS

DNS模块是基于UDP协议来实现的,在Nodejs中可以通过require('dns')实现域名的解析查询,Nodejs DNS模块分成两大类:

  1. 底层操作系统工具进行域名解析
  2. 链接到一个DNS网络服务器执行域名解析

底层操作工具域名解析

1
2
3
4
5
6
// Nodejs DNS模块的 dns.lookup()方法使用底层操作系统进行域名解析,是不需要经过网络通信的
const dns = require('dns');

dns.lookup('laibh.top',(err,address,family)=>{
console.log(`地址: ${address},地址族:${family}`)
})

链接到DNS服务器执行域名解析

1
2
3
4
5
// dns 模块除了 dns.lookup之外的函数,都会连接到实际DNS服务器以执行名称解析并始终使用网络执行DNS查询
const dns = require('dns');
dns.lookup('laibh.top',(err,records)=>{
console.log(records)
})

dns.lookup与dns.resolve不同

虽然用异步的角度来使用dns.lookup,但是内部的libuv底层线程池中确实同步的调用 getaddrinfo(3),所以可能有由于一些不确定的因素造成Node进程阻塞

与dns.lookup不同的是dns.resolve没有使用getaddrinfo(3),而通过网络执行的DNS查询,始终保持异步不会对其他进程产生负面影响

DNS域名解析过程

  1. 浏览器DNS缓存。访问一个URL优先查找浏览器的DNS缓存,命中就返回。未命中就继续下一步,查找操作系统的缓存。当修改了本地hosts域名指向发现浏览器缓存没有变化是因为每个浏览器有一个固定值。
  2. 系统(OS)缓存。查看操作系统中是否有域名对应的IP,位于操作系统的hosts文件。
  3. 路由器缓存。当浏览器DNS与系统OS缓存均没有映射的时候,则请求会发送到路由器缓存中检查
  4. ISP DNS缓存。ISP为互联网服务提供商。

DNS本地解析指的是系统缓存这一阶段,在浏览器缓存没有命中的情况下, 会从本地系统的一个hosts文件中寻找对应的IP

Cluster (集群)

在PM2的配置文件中可以设置exec_model:'clusterinstance两个属性来设置开启多个进程,PM2其实就是利用Nodejs Cluster这个模块来实现的,还有eggJs中的egg-cluster模块在启用Worker进程也是用到这个模块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
console.log(`Master 进程 ${process.pid} 正在运行`)

for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
http.createServer((req, res) => {
res.send(`你好,哈哈哈 ${process.pid}`)
}).listen(8000);
console.log(`Worker 进程 ${process.pid} 已启用`)
}

采用了哪种集群方式

集群模式通常实现有两种:

  1. 1个Node实例开启多个端口,通过反向代理服务器向各端口服务进行转发
  2. 1个Node实例开启多个进程监听同一个端口,通过负载均衡技术分配请求(Master->Worker)

第一个方案存在的一个问题就是占用多个端口,造成资源浪费,由于多个实例是独立运行的,进程间通信不太好做。好处是稳定性高,各实例之间没有影响。

第二个方案多个Node进程去监听同一个端口,好处是进程间通信相对简单,减少了端口的资源浪费,但是这个时候需要保证服务进程的稳定性,特别是对Master进程稳定性要求会更高,编码也会复杂。

Nodejs中自带的Cluster模块正是采用了第二种方案。

多个进程为什么可以监听同一个端口

端口不是被所有的进程全部监听,仅仅受到Master进程的监听。Master进程创建一个Socket并绑定监听到目标端口,通过子进程之间建立IPC通道之后,通过调用子进程的send方法,将Socket(链接句柄)传递过去。(Master通过cluster.fork方法创建的,本质上还是使用了child_process.fork这个方法)

使用 child_process.fork()创建的子进程,进行Socket传递的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// master.js
const fork = require('child_process').fork;
const cpus = require('os').cpus();
const server = require('net').createServer().listen(3000);

for (let i=0; i<cpus.length; i++) {
const worker = fork('worker.js');
// 将 Master 的 server 传递给子进程
worker.send('server', server);
console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}
// worker.js
const http = require('http');
const server = http.createServer((req, res) => {
res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
});

let worker;
// 第二个参数 sendHandle 就是句柄,可以是 TCP套接字、TCP服务器、UDP套接字等
process.on('message', function (message, sendHandle) {
if (message === 'server') {
worker = sendHandle;
worker.on('connection', function(socket) {
server.emit('connection', socket);
});
}
});

端口会被主进程绑定监听一次,但是主进程和子进程在建立IPC通信之后,发送Socket到子进程实现端口共享,在之后Master接受到新的客户端链接后,通过负载均衡技术再转发到各Worker进程。

多个进程之间如何通信

由于cluster.fork本质上还是使用child_process.fork()这个方法来创建子进程,进程间通信无非几种:pipe(管道)、消息队列、信号量、Domain Socket。Nodejs中是通过pipe(管道)实现的,pipe作用于之间有血缘关系的进程,通过fork传递,其本身也是一个进程,将一个进程的输出作为另外一个进程的输入。

如何对多个Worker进行请求转发

在Nodejs中使用了RoundRobin负载均衡策略,简称RP,它的实现原理是一种无状态的轮询策略,假定每台服务器的硬件资源、处理性能都是相同的,根据进程的数量,依次分配,直到所有进程处理完了,再开始重新计算分配。优点是实现起来简洁也易用,缺点是如果出现某个请求占用的时间较长,就会导致负载不会太均衡。

RP这种负载均衡技术适用于同一组服务器拥有相同的软硬件配置且平均的服务请求响应

RP是一种常见的复杂均衡技术,Nginx中也有使用,另外在RP的基础上还衍生了一个Weighted Round-Robin权重负载均衡轮询算法,简称WRR,同样也是使用轮询的技术,但是在基础上考虑了服务器的处理能力,实现时为服务器加上权重,这种负载均衡算法能够确保高性能的服务器得到更多的使用率,避免低性能的服务器负载过重。

Nodejs负载均衡策略设置

  • RoundRobin,RR。设置时要使用cluster.SCHED_RR,如果通过环境变量设置要使用rr,如果用cluster对象获取 schedulingPolicy数字表示为2
  • Shared Socket,SS,设置时要用cluster.SCHED_NONE,如果通过环境变量设置要用node,如果用cluster对象获取schedulingPolicy数字表示为1
1
2
3
4
5
6
7
8
9
10
11
12
13
// cluster对象的schedulingPolicy属性设置
const cluster = require('cluster');

// 策略一:一种轮询的策略,默认值
cluster.schedulingPolicy = cluster.SHCED_RR;

// 策略二:由操作系统调度的策略
cluster.schedulingPolicy = cluster.SCHED_NONE;

cluster.fork();

// 或者通过环境变量 NODE_CLUSTER_SCHED_POLICY设置:
env NODE_CLUSTER_SCHED_POLICY = 'none' node app.js

基于Stream实现多文件合并

一个简单的Stream操作

创建一个可读流readable一个可写流writeable,通过管道pipe将可写流绑到可读流,一个简单的Stream操作就可以完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const fs = require('fs');
const readable = fs.createReadStream('./log/read.txt');
const writeable = fs.createWriteStream('./log/write.txt');

readable.pipe(writeable)

// readable.pip(destionation[,option])
// destionation:是一个可写流对象,也就是一个数据写入的目标对象
// options:end,读取结束时终止写入流,默认值是true

// 默认情况下不需要手动调用写入流的end方法关闭的,更改end为false写入的目标将会处于一直打开状态,此时就需要监听可读流的end时间,结束之后手动调用可写流的end事件。

readable.pipe(writeable,{
end:false
});

readable.on('end',()=>{
writeable.end('结束')
})

如果可读流期间发什么什么错误,则写入的目标流将不会关闭,所以需要监听错误事件,手动关闭可写流,防止内存泄露。

多个文件通过Stream合并成一个文件

设置可读流的end为false可以保持写入流一直处于打开状态,通过这种方式,一开始可写流处于打开状态,知道所有的可读流结束,我们再将可写流关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
const fs = require('fs');
const path = require('path');

/**
* Stream 合并
* @param {String} sourceFiles 源文件目录名
* @param {String} targetFile 目标文件
*/

function streamMerge(sourceFiles, targetFile) {
const scripts = fs.readdirSync(path.resolve(__dirname, sourceFiles)); // 获取源文件目录下的所有文件
const fileWriteStream = fs.createWriteStream(path.resolve(__dirname, targetFile)); // 创建一个可写流
streamMergeRecursive(scripts, fileWriteStream);
}


/**
* Stream 合并的递归调用
* @param {Array} scripts
* @param {Stream} fileWriteStream
*/

function streamMergeRecursive(scripts = [], fileWriteStream) {
// 递归到尾 的情况判断
if (!scripts.length) {
return fileWriteStream.end("console.log('Stream 合并完成')") // 最后关闭可写流,防止内存泄露
}
const currentFile = path.resolve(__dirname, 'scripts/', scripts.shift());
const currentReadStream = fs.createReadStream(currentFile); // 获取当前的可读流

currentReadStream.pipe(fileWriteStream, { end: false });
currentReadStream.on('end', () => {
streamMergeRecursive(scripts, fileWriteStream)
})

currentReadStream.on('error', (error) => { // 监听错误事件,关闭可读流,防止内存泄露
console.log(error);
fileWriteStream.close()
})
}

streamMerge('./scripts', './script.js')

Stream pipe的使用与实现原理

通过流我们可以将一大块数据拆分称为一小部分一点一点的流动起来,不需要一次性全部读入,在Linux下可以通过|符号实现,类似的在Nodejs的Stream模块中同样也为我们提供了 pipe方法来实现

未使用Stream pipe的情况

在Nodejs中I/O操作都是异步的,先用util模块的promiseify方法将fs.readFile的callback形式转换为Promise形式

,它将数据一次性读入内存然后再进行返回,当数据文件很大的时候也是对内存的一种消耗,不推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// koa 的例子
const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const {promisify} = require('util');
const {resolve} = require('path');
const readFile = promisify(fs.readFile);

app.use(async ctx=>{
try{
ctx.body = await readFile(resolve(__dirname,'test.json')))
}catch(err){
ctx.body = err
}
}).listen(3000)

使用Steam pipe

1
2
3
4
5
6
7
8
9
10
11
12
13
app.use(async ctx=>{
try{
const readable = fs.createReadStream(resolve(__dirname,'test.json'));
ctx.body = readable;
}catch(err){
ctx.body = err;
}
})
// 在Koa中直接创建一个可读流赋值给ctx.body,框架内封装好了pipe方法,下面为源码
function respond(ctx){
let body = ctx.body;
if(body instanceof Stream) return body.pipe(res)
}

使用与不使用Stream

使用了可读流,通过pipe接口监听data与end事件,把data的可读流拆分称为一小块一小块的数据(chunks),像流水一样源源不断吐给客户端,而不再需要等待整个文件都加载到内存后才发送数据。pipe可以视为流的管道/通道方法,任何类型的流都会有这个方法来处理流的输入与输出。

总体来说,使用流可以大大提升响应时间,又能有效减轻服务器内存的压力

源码分析

在应用层调用 fs.createReadStream 方法,找到这个方法创建的可读流对象pipe的方法实现

/lib/fs.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 导出一个createReadStream方法,在这个方法里面创建一个ReadSream可读流对象,且ReadStream来自internal/fs/streams

// 懒加载,主要在用到的时候用来实例化 ReadStream/WriteStream等对象
function lazyLoadStreams(){
if(!ReadStream){
({ReadStream,WriteStream}) = require('internal/fs/streams');
[FileReadStream,FileWriteStream] = [ReadStream,WriteStream];
}
}

function createReadStream(path,options){
lazyLoadStreams();
return new ReadStream(path,options); // 创建一个可读流
}

module.exports = fs = {
createReadStream, // 导出 createReadStream 方法
}

/lib/internal/fs/streams.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 这个方法定义了构造函数 ReadStream,且在原型上定义了 open、_read、_destroy等方法,没有pipe方法,通过ObjectSetPrototypeOf方法实现了继承,ReadStream继承了Readable在原型中定义的函数,继续查找Readable的实现

const {Readable,Writeable} = require('stream');


function ReadStream(path,options){
if(!(this instanceof ReadStream)) return new ReadStream(path,options)

Readable.call(this,options)
}

ObjectSetPrototypeOf(ReadStream.prototype,Readable.prototype);
ObjectStreamProtptypeOf(ReadStream,Readable);

ReadStream.prototype.open = function(){}
ReadStream.prototype._read = function(n){}
ReadStream.prototype._destroy = function(err,cb)

module.exports = {
ReadStream,
WriteStream
}

/lib/stream.js

1
2
3
4
5
6
7
8
9
// to avoid cross-reference(require) issue

const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable')
Stream.Duplex = require('_stream_duplex');
Stream.Transform = requier('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');

/lib/internal/streams/legacy.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 继承了Events 模块,然后在原型上定义了pipe方法,而_stream_readable继承了Stream之后又自己实现了pipe方法
const {ObjectSetPrototypeOf} = primordials;
const EE = require('events');

function Stream(opts){
EE.call(this,opts)
}

ObjectSetPrototypeOf(Stream.prototype,EE.prototype);
ObjectSetPrototypeOf(Stream,EE);

Stream.prototype.pipe = function(dest,options){
// ...
}
module.exports = Stream;

/lib/_stream_readable.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 定义了Readable构造函数,且继承于lib/stream.js的Stream,然后重写pipe方法
module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('stream');

ObjectSetPrototypeOf(Readable.prototype,Stream.prototype)
ObjectSetPrototypeOf(Readable,Stream);

function Readable(options){
if(!(this instanceof Readable)) return new Readable(options)

Stream.call(this,options); // 继承自 Stream构造函数的定义
}
_stream_readable.js实现分析

1.声明构造函数Readable,继承Stream的构造函数和原型。

文件继承了events事件,拥有了events在原型中定义的属性,例如on、emit

2.声明pipe方法,订阅data事件

在Stream原型上声明pipe方法,订阅data事件,src为可读对象,dest为可写流对象。在使用pipe方法的时候也是监听的data事件,一边读取一边写入数据。

ondata方法的几个核心的实现:

  • dest.write(chunk):接受chunk写入数据,如果内部的缓冲小于创建流时配置的highWaterMark,则返回true(缓存未满),否则返回false时应该停止向流写入数据,直到‘drain’(清空缓存)事件被触发
  • src.pause():可读流会停止data事件,意味着此时暂停数据写入了

调用src.pause是为了防止读入数据过快来不及写入,如果缓存未满即dest.write(chunk),这个缓存是根据创建流时创建的highWaterMark属性,默认为16384(16k),对象模式的流默认为16

1
2
3
4
5
6
7
8
9
10
11
Readable.prototype.pipe = function(dest,options){
const src = this;
src.on('data',ondata);
function ondata(){
const ret = dest.write(chunk);
if(ret === false){
...
src.pause();
}
}
}

3.订阅drain事件,继续流动数据。继续写入事件到流时会触发drain事件,也就是dest.write(chunk)等于false(缓存满了)时,如果ondrain不存在则注册drain事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Readable.prototype.pipe = function(dest,options){
const src = this;
src.on('data',ondata);
function ondata(){
const ret = dest.write(chunk);
if(ret === false){
...
if(!ondrain){
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
ondrain = pipeOnDrain(src);
dest.on('drain',ondrain);
}
src.pause();
}
}
// 当写入流dest耗尽时,它将会在可读流对象 source上减少 awaitDrain计数器,为了确保所有需要缓存的写入都完成,即state.awaitDrain === 0和src可读流上的data事件存在,切换流到流动模式
function pipeOnDrain(src){
return function pipeDrainFunctionResult(){
const state = src._readableState;
debug('pipeOnDrain',state.awaitDrain);
if(state.awaitDrain){
state.awaitDrain--;
}
if(state.awaitDrain ===0 && EE.listenerCount(src,'data')){
state.flowing = true;
flow(src)
}
}
}

// Stream.read() 从内部缓存拉取并返回数据,如果没有可读的数据,则返回null,在可读流上src还有一个readable属性,如果可以安全地调用readable,read(),则为true
function slow(stream){
const state = stream._readableState;
debug('flow',state.flowing);
while(state.flowing && stream.read() !== null)
}
}

4.触发data事件。调用readable的resume方法,触发可读流的data事件,进入流动模式

1
2
3
4
5
6
7
8
Readable.prototype.pipe = function(dest,options){
const src = this;
// start the flow if it hasnot been started already.
if(!state.flowing){
debug('pipe resume')
src.resume();
}
}

resume方法内部又调用resume_(),最终执行了stream.read(0)读取了一次空数据(size设置为0),将会触发实例上的_read()方法,再触发data事件。

1
2
3
4
5
6
7
8
9
10
function resume(stream,state){
process.nextTick(resume_,stream,state)
}

function resume_(stream,state){
debug('resume',state.reading);
if(!state.reading){
stream.read(0);
}
}

5.订阅end事件

end事件:当可读流中没有数据可供消费时触发,调用onend函数,执行dest.end()方法,表明已没有数据要被写入可写流,进行关闭(关闭可写流的id),之后再调用stream.write会导致错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Readable.prototype.pipe = function(dest,options){
const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout &&
dest !== process.stderr;
const endFn = doEnd?onend:unpipe;
if(state.endEmitted){
process.nestTick(endFn)
}else{
src.once('end',endFn)
}
dest.on('unpipe',onunpipe)

function onend(){
debug('onend');
dest.end();
}
}

6.触发pipe事件,传入可读流对象

1
2
3
4
Readable.prototype.pipe = function(dest,options){
const source = this;
dest.emit('pipe',src);
}

在应用层使用但的时候可以在可写流上订阅pipe事件,做一些判断。

7.支持链式调用,最后返回dest

1
2
3
Stream.protptype.pipe = function(dest,options){
return dest;
}

/lib/internal/streams/legacy.js模块实现分析

声明构造函数Stream

声明构造函数Stream继承于事件events,此时就拥有了events在原型定义的属性,例如on/emit等方法

1
2
3
4
5
6
7
8
9
const {ObjectSetPrototypeOf} = primordials;

const EE = require('events');

function Stream(opts){
EE.call(this,opts)
}
ObjectSetPrototypeOf(Stream.prototype,EE.prototype);
ObjectSetPrototypeOf(Stream,EE)

声明pipe方法,订阅data事件

在Stream原型上声明pipe方法,订阅data事件,source为可读流对象,dest为可写流对象

在使用pipe方法的时候也是监听的data事件,一边读取数据一边写入数据

ondata方法的几个API:

  • dest.writable:如果调用writable.write()是安全的,则为true
  • dest.write(chunk):接收chunk写入数据,如果内部的缓冲小于内部创建流时配置的highWaterMark,则返回true,否则返回false时应该停止向流写入数据,知道drain事件被触发。
  • source.pause():可读流会停止data事件,意味着此时暂停数据写入了
1
2
3
4
5
6
7
8
9
Stream.prototype.pipe = function(dest,options){
const source = this;
function ondata(chunk){
if(dest.writable && dest.write(chunk) === false && source.pause){
source.pause();
}
}
source.on('data',ondata)
}

订阅drain事件

如果调用dest.write(chunk)返回false,就会调用source.pause()停止数据流动,继续写入事件到流时会触发drain事件,ondrain方法的几个API:

  • source.readable:如果可以安全地调用readable.read(),则为true,例如数据未读到末尾,则会返回true,表示可读的。
  • source.resume():将被暂停的可读流恢复触发data事件,并将流切换流动模式
1
2
3
4
5
6
7
8
Stream.prototype.pipe = function(dest,options){
function ondrain(){
if(source.readable && source.resume){
source.resume();
}
}
dest.on('drain',ondrain)
}

选项指定end属性,订阅end,close事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 如果end选项没有被提供,可读流订阅end或者close事件,后续将会触发该事件,指定dest.end方法,仅被调用一次,didOnEnd变量做了控制,主要是为了关闭可写流的id
// close:当流或者底层资源(比如文件描述符)被关闭时触发close事件
// end: 当可读流中没有数据可供消费的时候触发
// 可读流的end,destroy方法
// dest.end() 表明已经没有数据要被写入可写流,进行关闭,之后再调用stream.write会导致错误
// dest.destory() 销毁流
Stream.prototype.pipe = function(dest,options){
if(!dest._isStdio && (!options || options.end !== false)){
source.on('end',onend);
source.on('close',onclose);
}
let didOnEnd = false;
function onend(){
if(didOnEnd) return;
didOnEnd = true;
dest.end();
}

function onclose(){
if(didOnEnd) return;
didOnEnd = true;
if(typeof dest.destory === 'function'){
dest.destroy();
}
}
}

订阅可读流与可写流的error事件

可读流、可写流发生错误时触发error事件,调用onerror方法,首先移除可读流,可写流订阅的所有事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Stream.prototype.pipe = function(){
function onerror(er){
cleanup();
if(EE.listenerCount(this,'error') == 0){
throw er;
}
}

source.on('error',onerror);
dest.on('error',onerror);

function cleanup(){
source.removeListener('data',ondata);
dest.removeListener('drain',ondrain);

source.removeListener('end',onend);
source.removelistener('close',onclose);

source.removeListener('error',onerror);
dest.removelistener('error',onerror);

source.removeListener('end',cleanup);
source.removelistener('close',cleanup);

dest.removelistener('close',cleanup);
}
}

触发pipe事件

在pipe方法里面最后还会触发一个pipe事件,传入可读流对象

1
2
3
4
Stream.prototype.pipe = function(dest,options){
const source = this;
dest.emit('pipe',source);
}

支持链式调用

最后返回dest,支持A.pipe(B).pipe(c)的写法:

1
2
3
Stream.prototype.pipe = function(dest,options){
return dest;
}

util.promisify如何将Callback转换为Promise

util模块提供了很多工具函数,其中promisify方法可以将callback转换为Promise对象,解决回调地狱的问题。

简单实现版本

util promisify基本使用

将callback转为promise对象,首先确保这个callback为一个错误优先的回调函数,即(err,value)=>err指定一个错误参数,value为返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 创建一个text.txt文件,写入一些自定义内容,使用fs.readFile来读取这个文件进行测试
// 传统的Callback写法
const util = require('util');
fs.readFile('text.txt','utf8',(err,result)=>{
console.log('Error',err);
console.log('Result: ',result)
})

// Promise写法
const {promisify} = require('util');
const readFilePromisify = util.promisify(fs.readFile); // 转换为Promise

readFilePromisify('text.txt','utf8')
.then(result=>console.log(result))
.catch(err=>console.log(err))

自定义mayJunPromisify函数实现

自定义mayJunPromisify函数实现callback转换为promise,核心实现如下:

  • 校验传入的参数original是否为Function,不是则抛错
  • promisify(fs.readFile)执行之后会返回一个函数fn,定义待返回的函数后返回
  • fn返回的是一个Promise对象,在返回的Promise对象中执行callback函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
function mayJunPromisify(original){
if(typeof original !== 'function'){
throw new Error('The "original" argument must be of type Function,Received type undefined')
}

function fn(...args){
return new Promise((resolve,reject)=>{
try{
original.call(this,...args,(err,result)=>{
if(err){
reject(err)
}else{
resolve(result)
}
})
}catch(err){
resolve(result)
}
})
}
return fn;
}

util.promisify.custom基本使用

另一个功能是可以使用util.promise.custom符号重写util.promisify返回值。

在fs.readFile上定义util.promisify.custom符号,其功能为禁止读取文件

1
2
3
4
5
6
7
8
9
// 注意顺序要在 util.promisify之前
fs.readFile[util.promisify.custom] = ()=>{
return Promise.reject('该文件暂时禁止读取')
}

const readFilePromisify = util.promisify(fs.readFile);
readFilePromisify('text.txt','utf8')
.then(result=>console.log(result))
.catch(err=>console.log(err)) // 该文件暂时禁止读取

自定义mayJunPromisify.custom实现

  • 定义一个Symbol变量kCustomPromisifiedSymbol赋予mayJunPromisify.custom
  • 校验是否有自定义的promise函数
  • 自定义的mayJunPromisified.custom也要保证是一个函数,否则抛错
  • 直接返回自定义的mayJunPromisify.custom函数,后续的fn函数就不会执行了,所以在这里重写util.promisify返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 所以说util.promisify.custom是一个符号
const kCustomPromisifiedSymbol = Symbol('util,promisify.custom');
mayJunPromisify.custom = kCustomPromisifiedSymbol;

function mayJunPromisify(original){
if(typeof original !== 'function'){
throw new Error('The "original" argument must be of type Function,Received type undefined')
}

if(original[kCustomPromisifiedSymbol]){
const fn = original[kCustomPromisifiedSymbol];
if(typeof fn !== 'function'){
throw new Error('The "mayJunPromise.custom" property must be of the type Function,Received type number')
}
return Object.defineProperty(fn,kCustomPromisifiedSymbol,{
value:fn,
enumerable:false,
writable:false,
configurable:true
})
}

function fn(...args){}
return fn;
}

util.promisify回调函数多参转换

有些函数的回调形式是多个参数的,例如dns.lookup,它的回调形式是(err,address,family)=>...拥有 三个参数,对这种情况也做兼容

基本使用:

1
2
3
4
5
6
7
const dns = require('dns');
const lookupPromisify = util.promisify(dns.lookup);

lookupPromisify('laibh.top')
.then(({address,family})=>{
console.log('地址:',address,'地址族:',family)
}).catch(err=>console.log(err))

实现解析:

为了支持util.promisify也都会在函数上定义一个customPromisifyArgs参数,value为回调的多个参数的名称,类型为数组,例如dns.lookup绑定的customPromisifyArgs的value为['address','family'],其主要目的也是为了适配util.promisify

dns.lookup支持util.promisify核心实现

1
2
3
4
5
6
const {customPromisifyArgs} = require('internal/util');

ObjectDefineProperty(lookup,customPromisifyArgs,{
value:['address','family'],
enumerable:false
})

customPromisifyArgs这个参数是从internal/util模块导出的,仅内部调用,在外部util.promisify是没有这个参数的。也就意味着只有Node模块中例如dns.lookup、fs.read等方法在多参数的时候可以使用util.promisify转换为Promise,如果自定义的callback存在多参数的情况,使用util.promisify则不行,不过可以基于util.promisify自己封装一个:

1
2
3
4
module.exports = {
// Symbol used to customize promisify conversion
customPromisifyArgs:kCustomPromisifyArgsSymbol
}
  • 定义Symbol变量kCustomPromisifyArgsSymbol
  • 获取参数名称列表
  • (err,result)改为(err,...values),原先的result仅接受一个参数,改为...values接收多个参数
  • argumentNames存在且value>1,则回调会存在多个参数名称,经常遍历,返回一个obj
  • 否则values最多仅有一个参数名称,即数组values有且仅有一个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const kCustomPromisifyArgsSymbol = Symbol('customPromisifyArgs');

function promisify(original){
// 获取多个回调函数的函数参数列表
const argumentName = original[kCustomPromisifyArgsSymbol];
function fn(..args){
return new Promise((resolve,reject)=>{
try{
original.call(this,...args,(err,...values)=>{
if(err){
reject(err)
}else{
// argumentNames存在且values>1,则回调会存在多个参数名称,进行遍历,返回一个obj
if(argumentNames !== undefined && values.length > 1){
const obj = {};
for(let i=0;i<argumentNames.length;i+=1){
obj[argumentNames[i]] = values[i];
resolve(obj)
}
}else{
// 否则values最多只有一个参数名称,即数组values有且只有一个元素
resolve(values[0])
}
}
})
}
})
}
return fn;
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 由于kCustomPromiseArgsSymbol使用Symbol声明(每次重新定义都会不一样),且没有对外提供。要实现这个功能,需要每次在cb重新定义kCustomPromisifyArgsSymbol属性

const kCustomPromisifiedSymbol = Symbol('util.promisify.custom');
const kCustomPromisifyArgsSymbol = Symbol('customPromisifyArgs');
mayJunPromisify.custom = kCustomPromisifiedSymbol;

function mayJunPromisify(original){
if(typeof original !== 'function'){
throw new Error('The "original" argument must be of type Function,Received type undefined')
}

if(original[kCustomPromisifiesSymbol]){
const fn = original[kCustomPromisifiesSymbol];
if(typeof fn !== 'function'){
throw new Error('The "util.promisify.custom" property must be of type Function.Received type number')
}
return Object.defineProperty(fn,kCustomPromisifiedSymbol,{
value:fn,
enumerable:false,
writable:false,
configurable:true
})
// 获取多个回调函数的参数列表
const argumentNames = original[kCustomPromisifyArgsSymbol];

function fn(...args){
return new Promise((resolve,reject)=>{
try{
original.call(this,...args,(err,...values)=>{
if(err){
reject)(err);
}else{
// argumentName 存在且 values>1,则回调会存在多个参数名称,进行遍历,返回一个obj
if(argumentNames !== undefined && values.length > 1){
const obj = {};
for(let i=0;i<argumentNames.length;i+=1){
obj[argumentNames[i]] = values[i]
resolve(obj)
}
}else{
// 否则values最多仅有一个参数名称,即数组values有且仅有一个元素
resolve(values[0])
}
}
})
}catch(err){
reject(err)
}
})
}
}
}
module.exports = {
mayJunPromisify,
kCustomPromisifyArgsSymbol
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const {kCustomPromisifyArgsSymbol,mayJunPromisify} = require('./may-jun-promisify');
const fs = require('fs');

// mayJunPromisify.custom自定义Promise函数测试
function promisifyCustomTest(){
fs.readFile[mayJunPromisify.custom]=()=>{
return Promise.reject('该文件暂时禁止读取')
}
const readFilePromisify = mayJunPromisify(fs.readFile);
readFilePromisify('text.txt','utf8')
.then(result=>console.log(result))
.catch(err=>console.log(err)))
}

// 自定义cb多参数转换promise
function cbConverPromiseTest(){
function getUserById(id,cb){
const name = 'laibh',
age = 25;
cb(null,name,age);
}
Object.defineProperty(getUserById,kCustomPromisifyArgsSymbol,{
value:['name','age'],
enumerable:false
})

const getUserByIdPromisify = mayJunPromisify(getUserById);
getUserByIdPromisify(1)
.then({name,age}=>{
console.log(name,age);
})
.catch(err=>console.log(err))


}
promisifyCustomTest();
cbConverPromiseTest();

I/O

I/O即Input/Output,输入输出端口,是信息处理系统与外部世界之间的通信,输入手是系统接收的信号或数据,输出的则是从其发送的信号或数据

一次I/O操作分为等待资源,使用资源两个阶段,常见的词网络I/O,磁盘I/O

阻塞与非阻塞I/O

是对于操作系统内核而言的,发生在等待资源阶段,根据发起的I/O请求是否阻塞来判断

阻塞I/O:这种模式下一个用户进程在发起一个I/O操作之后,只有接收到响应或者超时时才可进行处理其他事情,否则I/O将会一直阻塞。以读取磁盘上的一段文件为例子,系统内核在完成磁盘寻道、读取数据、复制数据到内存之中之后,这个调用才算完成。阻塞的这段时间对CPU资源是浪费的。

非阻塞I/O:这种模式下一个用户进程发起一个I/O操作之后,如果数据没有就绪,会立刻返回(标志数据资源不可用),此时CPU时间片可以用来做一些其他事情。

同步与异步I/O

同步与异步I/O发生在使用资源阶段。

同步I/O:应用发送或接受数据后,如果不返回,继续等待(此处发生阻塞),直到数据成功或失败返回。

异步I/O:应用发送或接受数据后立刻返回,数据写入OS缓存,由OS完成数据发送或接收,并返回成功或者失败的信息给应用,NodeJs就是典型异步编程的例子。

用户空间与内核空间

操作系统为了多个应用同时运行,需要保证不同进程相对独立、内核的安全。所以操作系统把内存空间划分为用户空、内核空间两部分。用户空间存放用户程序代码和数据,而内核空间则存放内核代码和数据。

OSI七层模型与网际网协议族图:传输层之上(会话层、表示层、应用层)为用户空间(Web客户端、浏览器、FTP),下四层(传输层,网络层,数据链路层,物理层)为内核空间,例如传输层的TCP/UDP就对应内核空间。

操作系统I/O模型

同步阻塞IO

当进程调用 recvfrom() 函数的时候阻塞,应用程序开始系统调用,在系统内核数据就绪,将数据从内核中拷贝出来后结束。这个过程应用程序都处于等待状态,不能做其他事情,直到将数据拷贝到用户空间或出错才返回,我们称之为阻塞I/O模式。

同步非阻塞I/O

想对于同步非阻塞I/O模式,同步非阻塞I/O在 每次调用之后,如果数据没有就绪就会立即返回,之后重复调用检查I/O操作是否就绪,这对CPU资源非常浪费,直到数据就绪将数据从内核拷贝到用户空间,返回成功指示到应用程序。

I/O多路复用

链接(Socket)并发大的时候,上面的两种就不适合了,前面一个处理不完,后面的就只能干等。多路复用技术先进行select数据就绪后,调用recvfrom进行真正的I/O读写操作。高级之处在于能够一个线程同时处理多个Socket

多路复用中的I/O通常指的是网络I/O,多路指的是多个Socker链接,复用指操作系统进行运算调度最小单位线程,整体的意思就是多个网络I/O复用一个或少量线程来处理Socket

I/O多路复用的四种实现:select/poll/epoll/kqueue

  • select,通过轮询检查在文件描述符上设置的标志位来进行判断,select的轮询相当于在数据库中查找一条记录没有建立索引,对所有的socket进行全部遍历,这对CPU是浪费的。另外select还有一个限制,对于单个线程所能打开的文件描述符最大只有1024,那么基于select的轮询技术最多也只能很好的处理1000并发的吞吐量
  • poll,poll和select在实现上没有什么本质上的区别,poll基于链表来实现,没有了最大链接1024的限制。当文件描述符多了之后,每次调用都会对链接进行线性遍历,性能也是十分低下的。
  • epoll。是linux下效率最高的I/O事件通知机制,没有最大链接限制,通过callback回调通知机制,不再是每次调用对链接进行线性遍历,这样就不会随着文件描述符的增加导致效率下降。1GB内存的机器上大概能监听10w个端口,远超过select1024的限制
  • kqueue,与epoll类似,仅存于FreeBSD(一种类UNIX操作系统)

信号驱动IO

仅在Unix上支持,与I/O多路复用相比避免了select的阻塞轮询,应用程序进行系统调用后立即返回,处理其他事情,在数据就绪之后系统会发送一个SIGIO信号到应用程序,应用程序开始读取数据

异步IO模型

目前最理想形式的一种,应用程序发起系统调用后无需等待直接返回当前调用状态,进行后续的其他任务,结果由内核完成I/O操作之后通过回调通知到应用程序,中间没有阻塞过程。Linux2.6增加了AIO,但是很少系统能够实现

轮询技术Select 与 Epoll的区别

操作方式上

  • select采用了线性遍历来查找,链接多了之后在一个庞大的数组中每次遍历来锁定一个链接,非常消耗性能
  • epoll则不需要遍历,采用的是回调机制,可以看作是一个HashTable,来锁定一个对象非常快。

文件描述符限制

  • 对于文件描述符最大链接数select限制为1024
  • epoll则没有这个限制,通常在1GB内存的机器上所能支持的连接数为10W左右。

操作系统的支持

目前高性能的Web服务器Nginx是基于epoll来实现高并发的

Nodejs中的内存管理和V8垃圾回收机制

在Nodejs中,关于垃圾回收、内存释放不需要像C语言创建一个对象之后需手动创建一个delete/free的一个操作之后进行GC,Nodejs与java一样,由虚拟机进行内存自动管理。

NodeJs中的GC

node.js是基于Chrome v8引擎的javascript运行环境,V8就是虚拟机。

垃圾回收内存管理实践

内存泄露

node提供process.memoryUsage方法来查看当前进程内存使用情况,单位为节

  • ress(resident set size):RAM中保存的进程占用的内存部分,包括代码本身、栈、堆
  • heapTotal:堆中总共申请到的内存量
  • heapUsed:堆中目前用到的内存量,判断内存泄露主要以这个字节为准
  • external:V8引擎内存C++对象占用的内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
*单位字节格式为MB输出
*/

const format = function(bytes)
return (bytes/1024/1024).toFixed(2)+' MB'
}

/*
*封装print方法输出内存占用信息
*/
const print = function(){
const memoryUsage = process.memoryUsage();

console.log(
JSON.stringify({
res:format(memoryUsage.rss),
heapTotal:format(memoryUsage.heapTotal),
heapUsed:format(memoryUsage.heapUsed),
external:format(memoryUsage.external),
})
)
}

内存泄露的例子

堆用来存放对象引用类型,例如字符串、对象、在代码中创建一个Fruit存放在堆中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// example.js
function Quantity(num){
if(num){
return new Array(num * 1024 * 1024)
}
return num;
}
function Fruit(name,quantity){
this.name = name;
this.quantity = new Quantity(quantity);
}

let apple = new Fruit('apple');
print();
let banane = new Fruit('banane',20);
print();
// 执行代码,aplle对象heapUsed使用仅有4.21M,而banana由于quantity属性创建了一个很大数组空间导致heapUsed飙升到164.21M。
// {"rss":"19.94 MB","heapTotal":"6.83 MB","heapUsed":"4.21 MB","external":"0.01 MB"}
// {"rss":"180.04 MB","heapTotal":"166.84 MB","heapUsed":"164.24 MB","external":"0.01 MB"}

手动执行垃圾回收内存释放

1
2
3
4
5
6
banana = null;
global.gc();
print();
// 执行 node --expose-gc xxx.js --expose-gc参数表示运行手动执行垃圾回收机制,将banana对象赋值null进行GC
// {"rss":"52.48 MB","heapTotal":"9.33 MB","heapUsed":"3.97 MB","external":"0.01 MB"}
// heapUsed的使用已经降了下来

V8垃圾回收机制

垃圾回收指的是回收那些在应用程序中不再引用的对象,当一个对象无法从根节点访问这个对象就会作为垃圾回收的候选对象。这里的根对象可以为全局对象、局部变量,无法从根节点访问指的也就是不会再被其他活动对象所引用。

V8堆内存限制

在V8中限制64位机制大约为1.4G,32位的大概是0.7G,对于一些大内存的操作需要谨慎否则超出V8内存限制会造成进程退出

内存溢出边界的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// overflow.js
const format = function(bytes){
return (bytes / 1024 / 1024).toFixed(2)+' MB';
}

const print = function(){
const memoryUsage = process.memoryUsage();
console.log(
`heapTotal:${format(memoryUsage.heapTotal)},
heapUsed: ${format(memoryUsage.heapUsed)}`
)
}

const total = [];
setInterval(()=>{
total.push(new Array(20*1024*1024)) // 大内存占用
print();
},1000)

total为全局变量每次增长大概在160M左右且不会被回收,在接近V8边界时无法分配内存导致进程内存溢出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ node overflow.js
heapTotal: 166.84 MB, heapUsed: 164.23 MB
heapTotal: 326.85 MB, heapUsed: 324.26 MB
heapTotal: 487.36 MB, heapUsed: 484.27 MB
heapTotal: 649.38 MB, heapUsed: 643.98 MB
heapTotal: 809.39 MB, heapUsed: 803.98 MB
heapTotal: 969.40 MB, heapUsed: 963.98 MB
heapTotal: 1129.41 MB, heapUsed: 1123.96 MB
heapTotal: 1289.42 MB, heapUsed: 1283.96 MB

<--- Last few GCs --->

[87581:0x103800000] 11257 ms: Mark-sweep 1283.9 (1290.9) -> 1283.9 (1290.9) MB, 512.1 / 0.0 ms allocation failure GC in old space requested
[87581:0x103800000] 11768 ms: Mark-sweep 1283.9 (1290.9) -> 1283.9 (1287.9) MB, 510.7 / 0.0 ms last resort GC in old space requested
[87581:0x103800000] 12263 ms: Mark-sweep 1283.9 (1287.9) -> 1283.9 (1287.9) MB, 495.3 / 0.0 ms last resort GC in old space requested


<--- JS stacktrace --->

v8提供了两个参数仅在启用阶段调整内存限制大小,分别为调整老生代、新生代:

  • –max-old-space-size=2048
  • –max-new-space-size=2048

内存不是越大越好,一方面是服务器资源昂贵,另外是V8以1.5G的堆内存进行一次小的垃圾回收大约需要50毫秒以上时间,会导致JavaScript进程暂停,这也是最主要的一方面。

新生代与老生代

新生代空间

由于新空间的垃圾回收机制很频繁,所以处理方式必须非常快,采用Scavenge算法,这是一种复制算法,新生代空间会被一分为二划分为两个相等大小的from-space和to-space工作方式是将from space中存活的对象复制出来,然后移动它们到to space 中或者被提升到老生代空间中,对于from space中没有存活的对象将会被释放,完成这些复制后再将from space和to space进行互换。

Scavenge算法适用少量内存的垃圾回收,但是有很大的空间开销,对于新生代少量内存是可以接受的

老生代空间

新生代空间在垃圾回收满足于一定的条件(是否经过Scavenge空间、to space内存占比)会被晋升到老生代空间中,在老生代空间中的对象都已经至少经历了一次或者多次的回收所以它们的存活概率会更大。在使用Scavenge算法会有两个缺点,一是将会重复的复制存活对象使得效率低下,二是对空间资源的浪费,所以在老生代空间中采用了 Mark-Sweep(标记清除)和Mark-Compact(标记整理)算法

Mark-Sweep处理时分为标记、清除两个步骤,与Scavenge算法只复制活对象相反的是在老生代空间中由于活对象占多数Mark-Sweep在标记阶段遍历堆中的所有对象仅标记活对象把未标记的死对象清除,这时一次标记清除就已经完成了。有一个问题是被清除的对象遍布于各内存地址,产生很多内存碎片

Mark-Compact(标记整理算法)为了解决内存碎片问题,在其工作过程中将活着的对象往一端移动,这时内存空间是紧凑的,移动完成之后,直接整理边界之外的内存。

小结

V8使用了不同的垃圾回收算法Scavenge/Mark-Sweep/Mark-Compact.这三种垃圾回收算法都避免不了在进行垃圾回收时需要将应用程序暂停,待垃圾回收完成之后在恢复应用逻辑,对于新生代空间来说由于很快所以影响不大,但是对于老生代空间由于存活对象较多,停顿还是会造成影响的,因此V8又新增了增量标记的方式减少停顿时间。

内存泄露

内存泄露(Memory Leak)是指程序中已动态分配的堆内存由于某种原因程序未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统奔溃等严重后果。

全局变量,未声明的变量或挂在全局global下的变量不会自动回收,将会常驻内存直到直到进程退出才会释放,除非通过delete或者重新赋值为undefined/null解决之间的引用关系,才会被回收。

闭包,也是一个常见的内存泄露问题,闭包会引用父级函数中的变量,如果闭包得不到释放,闭包引用的父级变量也不会释放从而导致内存泄露

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var theThing = null;
var replaceThing = function(){
var originalThing = theThing;
var unused = function(){
if(originalThing){
console.log('hi')
}
theThing = {
longStr:new Array(1000000).join('*'),
someMethod:function(){
console.log(someMessage)
}
}
}
}
setInterval(replaceThing,1000)

代码运行时,每次执行replaceThing方法都会生成一个新的对象,但是之前的对象没有释放导致的内存泄露。

慎将内存作为缓存

通过内存来做缓存是最快的实现方式,缓存中的存储的键越多,长期存活的对象就越多,垃圾回收时将这些对象做无用功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 下面一个获取用户Token的例子,memoryStore对象会随着用户数的增加而增长,当启动多个线程或者部署在多台机器会造成每个进程都保存一份,显然是资源的浪费,最好是通过Redis做共享
const memoryStore = new Map();

exports.getUserToken = function(key){
const token = memoryStore.get(key);
if(token && Date.now() - token.now > 2 * 60){
return token;
}

const dbToken = db.get(key);
memoryStore.set(key,{
now:Date.now(),
val:dbToken
});
return token;
}

模块私有变量内存常驻

加载一个模块代码之前,Nodejs会使用一个如下的函数封装器将其封装,保证了顶层的变量(var,const,let)在模块范围内,而不是全局对象。这个时候就会形成一个闭包,在require时会被加载一次,将exports对象保存在内存中,直到进程退出才会回收,这个将会导致的是内存常驻,所以避免一些没必要的模块加载,否则也会造成内存增加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
(function(exports,require,module,__filename,__dirname){
// 模块的代码实际上在这里
})
// 所以建议对模块的引用仅在头部初次加载之后用const缓存起来,而不是在使用时每次都去加载一起。
// 推荐
const a = require('a.js');
function test(){
a.run();
}

// 不推荐
function test(){
require('a.js').run();
}

事件反复监听

NodeJs中对一个事件反复监听则会报下面的错误,实际上使用的EventEmitter类,包含一个listeners数组,默认为10个监听器超出这个数则会报警,用于发现内存泄露,也可以通过emitter.setMaxListeners()方法为指定的EventEmitter实例修改限制

1
MaxListenersExceededWarning:Possible EventEmitter memory leak detected, 11 connect added.Use emitter.setMaxListeners() to increase limit

其他注意事项

使用定时器setInterval时记得使用对应的clearInterval进行清除。因为setInterval执行完之后会返回一个值且不会自动释放,另外还有map/filter等对数组进行操作,每次操作之后都会创建一个新的数组,将会占用内存,如果单纯的遍历map可以用forEach。

插件

缓存

定时任务

模板引擎

框架

ORM

性能

指标

QPS

QPS(Query Per Second)指每秒查询量,规定时间内所能处理的流量大小,通常QPS值越大服务器的吞吐量也就越大,相对服务器负荷也会越高

QPS=并发量/平均响应时间并发量 = QPS * 平均响应时间

TPS

TPS(TransactionPerSecond)指每秒事物处理量,每秒钟系统所能处理的交易或事务的数量,用来形容系统的性能。

两者区别

一次下单请求,访问一次创建接口产生一次TPS,对于服务器的请求可能会产生多次,比如查询用户地址信息、商品数据信息、商品报价信息,这些请求计入QPS,也就是产生了3次QPS

系统扩容评价

根据二八法则来评估系统扩容需要多少台机器,二八法则即20%的时间承载80%的流量,把20%的时间称为峰值时间,换算公式:

1
2
(总PV数 * 80%) / (每天描述 * 20%) = 峰值时间每秒请求数
峰值时间内每秒请求数(QPS)/单台机器QPS = 需要的机器

假设有1000wPV,总共需要的QPS为多少?

1
(1000000 * 0.8) / (24 * 60 * 60 * 0.2) = 463(QPS)

假设每台机器支撑100QPS,则共需要的机器为

1
463(总有QPS)/100(单机QPS) = 5(约需要5台机器)

Nodejs CPU使用率

os.cups()数据指标

返回的对象数组中有一个times字段,包含了user/nice/sys/idle/irq几个指标数据,分别代表CPU在用户模式、良好模式、系统模式、空闲模式、中断模式下话费的毫秒数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
[
{
model: 'Intel(R) Core(TM) i5-4590 CPU @ 3.30GHz',
speed: 3292,
times: {
user: 151474100,
nice: 0,
sys: 95311197,
idle: 2258535287,
irq: 2136605
}
},
{
model: 'Intel(R) Core(TM) i5-4590 CPU @ 3.30GHz',
speed: 3292,
times: {
user: 84900939,
nice: 0,
sys: 65104926,
idle: 2355314423,
irq: 874869
}
},
{
model: 'Intel(R) Core(TM) i5-4590 CPU @ 3.30GHz',
speed: 3292,
times: {
user: 144337320,
nice: 0,
sys: 112618416,
idle: 2248364428,
irq: 699602
}
},
{
model: 'Intel(R) Core(TM) i5-4590 CPU @ 3.30GHz',
speed: 3292,
times: {
user: 110066333,
nice: 0,
sys: 84988268,
idle: 2310265437,
irq: 897333
}
}
]

定义方法getCPUInfo用来获取系统CPU信息,提供了CPU利用率的实时监控,这个实时不是绝对的实时,会有时差,下面实现中默认设置的1秒钟,可以通过Options.ms进行调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const os = require('os');
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));


class OSUtils {
constructor() {
this.cpuUsageMSDefault = 1000; // CPU 利用率默认时间段
}

/**
* 获取CPU信息
* @return {Object} CPU信息
*/
_getCPUInfo() {
const cpus = os.cpus();
let user = 0,
nice = 0,
sys = 0,
idle = 0,
irq = 0,
total = 0;

for (let cpu in cpus) {
const times = cpus[cpu].times;
user += times.user;
nice += times.nice;
sys += times.sys;
idle += times.idle;
irq += times.irq;
}
total += user + nice + sys + idle + irq;

return {
user,
sys,
idle,
total
}
}

/**
* 获取某时间段CPU利用率
* @param {Number} Options.ms [时间段,默认是1000ms,即1秒钟]
* @param {Boolean} Options.percentage [true(以百分比结果返回)| false]
* @return {Promise}
*/
async getCPUUsage(options = {}) {
const that = this;
let { cpuUsageMS, percentage } = options;
cpuUsageMS = cpuUsageMS || that.cpuUsageMSDefault;
const t1 = that._getCPUInfo(); // t1时间点的 CPU信息
await sleep(cpuUsageMS);

const t2 = that._getCPUInfo(); // t2时间点的 CPU信息
const idle = t2.idle - t1.idle;
const total = t2.total - t1.total;
let usage = 1 - idle / total;

if (percentage) usage = (usage * 100.0).toFixed(2) + '%';
return usage;
}

}

使用方式:

1
2
3
4
5

const osUtils = new OSUtils()
osUtils.getCPUUsage({ percentage: true }).then(cpuUsage => {
console.log('CPU 利用率', cpuUsage)
});

NodeJs与Event Loop

node之前的世界,多线程服务器中,Web应用程序是用一个client/server(客户端/服务端)模式所编写的,其中client将向server请求资源并且将会根据这个资源以响应,server仅在client请求时做出响应,并在每次响应后关闭连接。这种模式是有效的,因为对服务器的每一个请求都需要时间和资源(内存、CPU等等),服务器必须完成上一个请求,才能接受下一个请求。需要同时处理N个请求,服务器就需要N个线程,如果有N+1个请求,就必须等N个线程中的任何一个可用。

服务器中的线程不是唯一的问题,一个线程为什么不能同时处理2个或者更多请求,是因为阻塞了Input/Output操作。

操作 CPU时钟周期数(ticks)
CPU寄存器 3
L1 Cache(一级缓存) 8
L2 Cache(二级缓存) 12
RAM(随机存取存储器) 150
Disk(磁盘) 30 000 000
NetWork(网络) 250 000 000

时钟周期也称为tick/clock cycle/clock period等,指一个硬件在被使用过程中,被划分为多个时间周期,当我们需要比较不同硬件性能时,就在不同硬件上测试同一个软件,观察它们的时钟周期时间和周期指数,如果时钟周期越长、周期越多,就意味着这个硬件需要的性能较低。

Event Loop事件循环

事件循环实际上是一个无限循环,并且线程里唯一可用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void StartNodeInstance(void* arg){
// ...
{
SealHandleScope seal(isolate);
bool more;
do{
v8::platform::PumpMessageLoop(default_platform,isolate);
more = uv_run(env->event_loop(),UV_RUN_ONCE);
if(mode == false){
v8::platform::PumpMessageLoop(default_platform,isolate);
EmitBeforeExit(env);

more = uv_loop_alive(env->event_loop());
if(uv_run(env->event_loop),UV_RUN_NOWAIT)!=0){
more = true
}
}
}while(more == true)
}
}

事件循环经历6个阶段,所有阶段的执行被称为tick

1
timers->pending callbacks ->idle,prepare->poll->check->close callbacks
  • timers:这个阶段执行定时器setTimeout和setInterval的回调函数
  • pending callbacks:几乎所有的回调都在这里执行,除了close回调,定时器timers阶段的回调和setImmediate
  • idle,prepare:仅在内部使用
  • poll:检索新的I/O事件,适当时Node在此处阻塞
  • check:setImmediate回调函数将在这里执行
  • close callbacks:一些准备关闭的回调函数,如 socket.on(‘close’,…)

当Event loop需要执行I/O操作时,它将从一个池(通过Libuv库)中使用系统线程,当这个作业完成时,回调将排队等待在pending callbacks阶段被执行

CPU密集型任务问题

设定一个函数,给定一个数,计算在一个数组中返回N个质数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// primes.js
function isPrime(n) {
for (let i = 2, s = Math.sqrt(n); i <= s; i++)
if (n % i === 0) return false;
return n > 1;
}

function nthPrime(n) {
let counter = n;
let iterator = 2;
let result = [];

while (counter > 0) {
isPrime(iterator) && result.push(iterator) && counter--;
iterator++;
}
return result;
}

module.exports = { isPrime, nthPrime };

const http = require('http');
const url = require('url');
const primes = require('./primes');

// index.js
http.createServer((req, res) => {
const { pathname, query } = url.parse(req.url, true);
if (pathname === '/primes') {
const result = primes.nthPrime(query.n || 0);
res.setHeader('Content-Type', 'application/json');
res.write(JSON.stringify(result));
res.end()
} else {
res.statusCode = 404;
res.write('Not Found');
res.end();
}
}).listen(9898);
// index.js 创建一个服务并在每次请求 /primes这个库,通过query传递参数,假设有3个客户端访问这个非阻塞API,第一个页面请求5个质数,第二个1000个,第三个100000000个,会发现,第三个请求时,客户端就会被阻塞,因为质数会占用大量的CPU,主线程忙于执行密集型的代码,将无法做其他事情,还有Node引入了工作线程

工作线程

工作线程对于执行CPU密集型的javascript操作非常有用,在I/O密集型的工作中用途不大,NodeJs内置的异步I/O操作比工作线程效率更高

修改代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const { workerData, parentPort } = require('worker_threads');

function isPrime(n) {
for (let i = 2, s = Math.sqrt(n); i <= s; i++)
if (n % i === 0) return false;
return n > 1;
}

function nthPrime(n) {
let counter = n;
let iterator = 2;
let result = [];

while (counter > 0) {
isPrime(iterator) && result.push(iterator) && counter--;
iterator++;
}
return result;
}

parentPort.postMessage(nthPrime(workerData.n))

// index.js
const http = require('http');
const url = require('url');
const { Worker } = require('worker_threads')


http.createServer((req, res) => {
const { pathname, query } = url.parse(req.url, true);
if (pathname === '/primes') {
const worker = new Worker('./primes.js', {
workerData: {
n: query.n || 0
}
})

worker.on('error', () => {
res.statusCode = 500;
res.write('Oops there was an error');
res.end();
})

let result;

worker.on('message', (message) => {
result = message;
})

worker.on('exit', () => {
res.setHeader('Content-Type', 'application/json');
res.write(JSON.stringify(result));
res.end()
})
} else {
res.statusCode = 404;
res.write('Not Found');
res.end();
}
}).listen(9898);
// index.js在每个请求中创建一个Worker实例,在一个工作线程中加载并执行primes文件,当这个质数列表计算完成买这个message将会被触发,接受信息并赋值result,由于这个job已完成,将会再次触发exit事件,允许主线程发送数据到客户端
// primes导入 workerData(从主线程传递参数),parentPort是向主线程发送消息的方式
// 主线程将不会再阻塞,但是生成工作线程并不是最佳实践,创建新线程并不便宜,一定要先创建一个线程池

Redis

Redis是一个免费开源、基于内存的高性能Key-Value数据库,具有速度快,数据持久化,多语言和多功能等特性。

速度快:10w OPS能力,单线程模型,数据存于内存中,注意一次只能运行一次命令,使用过程拒绝使用慢命令,例如keys/flushall/flushdb/slow lua script/mutil/exec等

持久化:基于内存模型,断电后或者服务重启会造成数据丢失,针对这个问题,提出两种数据持久化策略,分别为RDB和AOF,会将Redis在内存中的数据异步更新到磁盘中,实现数据的持久化功能。

丰富的数据类型:除了常见的String/HashTable/List/Set/Zset之外还有BitMaps(位图)、HyperLogLog(超小内存唯一计数)

多语言:基于TCP的通信方式,支持Node.Js/Python/Java/Ruby/Lua等。

多功能:提供了发布订阅、简单的事务功能、pipeline提高客户端并发效率。另外在Redis中所有单个命令操作都是原子性的,如果想对多个命令一起操作,可以借助Lua脚本(实现自定义命令,保证原子性)

通用命令

  • keys * :遍历所有key,生产环境不建议使用,时间复杂度O(n)
  • dbsize key:计算key的总数,Redis内置了这个计数器,会实时更新key的总数,时间复杂度O(1)
  • exists key:检查key是否存在,时间复杂度为O(1)
  • expire key seconds:key在指定second后过期,时间复杂度O(1)
  • ttl key:key剩余的过期时间,时间复杂度O(1)
  • persist key: 去掉key的过期时间,时间复杂度O(1)
  • type key:查看key的类型,时间复杂度O(1)

数据结构

字符串

最大限制512MB,适用于缓存、计算器、分布式锁等,字符串类型的值可以为简单的字符串、JSON、XML、数组甚至是二进制(视频)

常见命令
命令 含义 时间复杂度
set get del 对key进行设置、读取、删除 O(1)
incr decr 计数 O(1)
incrby decrby 对计数设置增量 O(1)
setnx key存在不做任何操作 O(1)
setex key存在做操作与setnx相反 O(1)
getset 设置新值返回旧值 getset key newValue O(1)
mset mget 多个key进行设置、读取 O(1)

set

1
2
3
4
5
set key value [ex seconds] [px milliseconds] [nx|xx]
// seconds:单位(秒)
// milliseconds:单位(毫秒)
// nx:key 存在不做任何操作,等价于setnx
// xx:key 存在做操作与nx相反,相当于setex

mget mset

mget/mset可以批量获取或设置值,如果使用get多次读取数据等价于n次网络时间+n次命令时间,这种方法可以用mget优化,等价于1次网络时间+n次命令时间,这是一个O(n)操作,避免命令过多客户端阻塞

1
2
3
4
5
6
127.0.0.1:6379> mset key1 val1 key2 val2 key3 val3
OK
127.0.0.1:6379> mget key1 key2 key3
1) "val1"
2) "val2"
3) "val3"

incr decr incrby decrby

  • incr:自增
  • decr:自减
  • incrby:指定数字自增
  • decrby:指定数字自减
  • incrbyfloat:指定浮点数自增
1
2
3
4
5
incr key
decr key
incrby key increment
decrby key decrement
incrbyfloat key increment
应用场景

缓存,对城市列表数据进行缓存

1
2
3
4
5
6
7
8
9
10
11
// 伪代码
function cityList(){
const redisKey = 'city';
let cities = redis.get(redisKey)

if(!cities){
cities = mongo.getCityList();
redis.set(redisKey,JSON.stringify(cities));
}
return cities;
}

分布式锁

1
set key value [EX seconds] [PX milliseconds] [NX|XX]

计数器,网站的PV/UV统计,文章点赞、阅读量,视频网络的播放量,Redis提供的incr命令可实现计数器功能,性能好复杂度为O(1)

1
2
3
4
const incrPageViewsCounter = pageId =>{
const key = `page:views:${pageId}`;
return redis.incr(key);
}

Session存储,Redis不会因为服务器重启导致Session数据丢失,具有数据持久化功能。

限流,短信发送为了避免接口被频繁调用,通常要在指定时间内避免重复发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const SMSLimit = async phone =>{
const key = `sms:limit:${phone}`;
const result = await redis.set(key,1,'EX',60,'NX');

if(result === null){
console.log('60s 内无法再次发送验证码')
return false
}

console.log('可以发送')
return true;
}

SMSLimit(18800000000)

哈希

哈希结构有一个特点,所有命令都是H开头,hash类型其值本身就由一个或多个filed-value构成

1
2
3
4
5
6
7
hashKey = {
filed1:value1,
filed2:value2
}
// 优:节省空间,可以部分更新
// 缺:不支持TTL设置,Redis中过期时间只针对顶级Key,无法对Hash Key的field设置过期时间,只能对整个Key 通过expire设置
// 注意:在使用hgetall的时候注意,集合很大将会浪费性能
常见命令
命令 含义 时间复杂度
hset 对key的field进行设置 O(1)
hget 获取key指定的field O(1)
hdel 删除key指定的field O(1)
hincrby hincrbyfloat 类似于incrby incryfloat 这个操作的是field O(1)
hmset hmget 对多个filed value 设置 读取 O(n)
hgetall 获取key的所有field value O(n)
hvals 获取key的所有value O(n)
hkeys 获取key的所有fields O(n)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
127.0.0.1:6379> hset student name Jack
(integer) 1
127.0.0.1:6379> hget student name
"Jack"
127.0.0.1:6379> hmset student age 18 sex man
OK
127.0.0.1:6379> hmget student sex age
1) "man"
2) "18"
127.0.0.1:6379> hgetall student
1) "name"
2) "Jack"
3) "age"
4) "18"
5) "sex"
6) "man"
应用场景

hash适合将一些数据存储在一起,例如缓存用户信息,与字符串不同的是,hash可以对用户信息结构中的每个字段单独存储,当需要获取信息时可以仅获取需要的部分字段。如果用字符串存储,两种方式,一种是将信息拆分为多个键(每个属性一个键)来存储,占用过的key同时占用空间;另外一种方式是序列化字符串存储,这种方式如果去数据只能全部取出并且还要进行反序列化,序列化/反序列化也有一定的内存开销

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 缓存用户信息例子:
// 模拟查询Mongo数据
const mongo = {
getUserInfoByUserId:userId =>{
return{
name:'Jack',
age:19
}
}
}

// 获取用户信息
async function getUserInfo(userId){
const key = `user:${userId}`;
try{
// 从缓存获取数据
const userInfoCache = await redis.hgetall(key);

// 如果 userInfoCache为空,返回值为{}
if(Object.keys(userInfoCache).length ===0){
const userInfo = mongo.getUserInfoByUserId(userId);
await redis.hmset(key,userInfo);
await redis.expire(key,120);
return userinfo;
}
return userInfoCache;
}catch(err){
console.error(err);
throw err;
}
}

getUserInfo(1)

列表

Redis的列表用来存储字符串元素的集合,基于Linked Lists实现,意味着插入、删除操作非常快,时间复杂度为O(1),索引很慢,时间复杂度为O(n)

Redis列表命令都是L开头,在实际运用中可以作为队列或者栈

  • Stack(栈):后进先出,实现命令lpush+lpop
  • Queue(队列):先进先出,实现命令lpush+rpop
  • Capped Collection(有限集合):lpush+ltrim
  • Message Queue(消息队列):lpush+brpop
常见命令
命令 含义 时间复杂度
lpush rpush 列表左端/右端插入一个值 O(1~n)
linsert 列表指定的值前/后插入新值 O(n)
lpop rpop 列表左侧或者右侧弹出一个值 O(1)
blpop brpop lpop rpop 的阻塞版本,需设置timeout O(1)
ltrim 按照索引范围修剪列表 O(n)
lrange 获取指定范围内的元素列表 O(n)
llen 获取列表长度 O(1)
lset 设置列表指定索引值为新值 O(n)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 列表左侧加入三个元素
127.0.0.1:6379> lpush languages JavaScript Python Go
(integer) 3

# 获取列表长度
127.0.0.1:6379> llen languages
(integer) 3

# 获取指定范围内元素列表 从左到右 start/end->0/N-1 从右到左 start/end->-1/-N
127.0.0.1:6379> lrange languages 0 2
1) "Go"
2) "Python"
3) "JavaScript"

# 列表右侧插入元素
127.0.0.1:6379> rpush languages TypeScript
(integer) 4

# 查看列表的元素
127.0.0.1:6379> lrange languages 0 3
1) "Go"
2) "Python"
3) "JavaScript"
4) "TypeScript"

# 列表左端移除一个元素
127.0.0.1:6379> lpop languages
"Go"

# 列表右端移除一个元素
127.0.0.1:6379> rpop languages
"TypeScript"

# 谁的那个列表指定索引值为新值
127.0.0.1:6379> lset languages 1 JS
OK

# 列表指定的值前/后插入新值
127.0.0.1:6379> linsert languages after JS Nodejs
(integer) 3

# 按照索引范围修剪列表(元素截取)
127.0.0.1:6379> ltrim languages 1 2
OK
应用场景

消息队列,Redis List结构的lpush与brpop命令可实现消息队列,lpush命令是从左端插入数据,brpop命令是从右端阻塞式的读取数据,阻塞读过程中如果队列中没有数据,会立即进入休眠直到数据到来或超过设置的timeout时间,会立即醒过来

1
2
3
4
5
6
7
8
9
async function test(){
const key = 'languages';
// 阻塞读,timeout为5秒钟
const result = await redis.brpop(key,5);
console.log(result);
}

test();
test();

集合

Redis集合类型可用来存储多个字符串元素,和列表不同,集合元素不允许重复,集合中的元素是无须的,也不能通过索引下标获取元素。Redis集合的命令都是S开头

常用命令
命令 含义 时间复杂度
sadd 集合中添加元素,如果元素重复则添加失败 O(1)
srem 删除集合中的元素 O(1)
scard 计算集合中的元素个数 O(1)
sismember 判断集合中是否存在指定元素 O(count)
srandmember 随机从集合中返回指定元素 O(n)
sinter 求集合交集
sunion 求集合并集
sdiff 求集合差集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 集合中添加元素
127.0.0.1:6379> sadd languages2 Nodejs JavaScript
(integer) 2

# 计算集合中元素个数
127.0.0.1:6379> scard languages2
(integer) 2

# 判断集合中是否存在指定元素 1 存在 0不存在
127.0.0.1:6379> sismember languages2 Nodejs
(integer) 1

127.0.0.1:6379> sismember languages2 Nodejs1
(integer) 0

# 随机从集合中返回n个元素
127.0.0.1:6379> srandmember languages2 2
1) "JavaScript"
2) "Nodejs"

127.0.0.1:6379> srandmember languages2 3
1) "JavaScript"
2) "Nodejs"

# 设置用户1 使用的语言
127.0.0.1:6379> sadd user:1 Nodejs JavaScript
(integer) 2

# 设置用户2 使用的语言
127.0.0.1:6379> sadd user:2 Nodejs Python
(integer) 2

# 求 user:1与 user:2 交集
127.0.0.1:6379> sinter user:1 user:2
1) "Nodejs"

# 求user:1 与 user:2 并集
127.0.0.1:6379> sunion user:1 user:2
1) "JavaScript"
2) "Nodejs"
3) "Python"

# 求user:1 与 user:2 差集
127.0.0.1:6379> sdiff user:1 user:2
1) "JavaScript"
应用场景

抽奖,Redis的结合有去重功能,一些抽奖类项目中可以存储中奖的用户Id,能够保证同一个用户Id不会中奖两次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async function test(userId){
const key = 'luck:users';
const result = await redis.sadd(key,userId);

// 如果元素存在,返回0表示未添加成功
if(result === 0){
console.log('您已中间一次,无法再次参与');
return false
}
console.log('恭喜您中奖')
return true;
}

test(1);

计算用户共同感兴趣的商品,sadd与sinter可以用来统计用户共同感兴趣的商品,sadd保存每个用户喜欢的商品标签,使用sinter对每个用户感兴趣的商品标签求交集

有序集合

Redis有序集合zset保留了集合set元素不能重复的特征之外,在有序集合的元素中是可以排序的,与列表使用索引下标不同的是有序集合是有序集合给每个元素设置一个分值(score)作为排序的依据

Redis有序集合是Z开头的

常见命令
命令 含义 时间复杂度
zadd 集合中添加元素 O(logN)
zrem 集合中删除元素 O(1)
zscore 返回集合的分数 O(1)
zincrby 增加或者减少集合的分数 O(1)
zcard 返回元素的个数 O(1)

zadd

1
2
3
4
5
6
zadd key [NX|XX] [CH] [INCR] score member [sroce member ...]
# [NX|XX]:NX member必须不存在才添加成功,用户Create;XX,member 必须存在才能更新成功,用于UPDATE
# [CH]:返回此次操作后有序集合元素和分数发生的变化
# [INCR]:对score做增加,相当于 zincrby
# score:代表分数(排序)
# member:成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 有序集合 grages 中添加3个元素
127.0.0.1:6379> zadd grades NX 80 xiaoming 75 xiaozhang 85 xiaoli
(integer) 3

# 查看成员 xiaozhang 分数
127.0.0.1:6379> zscore grades xiaozhang
"75"

# 更新成员 xiaozhang 分数
127.0.0.1:6379> zadd grades XX 90 xiaozhang
(integer) 0

# 再次查看成员xiaozhang分数
127.0.0.1:6379> zscore grades xiaozhang
"90"

# 查看成员排名
# 分数从低到高
127.0.0.1:6379> zrank grades xiaozhang
(integer) 2

# 分数从高到低
127.0.0.1:6379> zrevrank grades xiaozhang
(integer) 0

# 增加成员分数
127.0.0.1:6379> zincrby grades 5 xiaozhang
"95"

# 返回指定范围成员排名,WITHSCORES可选参数,去掉则不返回分数
127.0.0.1:6379> zrange grades 0 2 WITHSCORES
1) "xiaoming"
2) "80"
3) "xiaoli"
4) "85"
5) "xiaozhang"
6) "95"

# 返回指定分数范围内的成员列表
127.0.0.1:6379> zrangebyscore grades 85 100
1) "xiaoli"
2) "xiaozhang"

# 删除指定成员
127.0.0.1:6379> zrem grades xiaoli
(integer) 1

高级特性

慢查询

Redis整个生命周期:发送命令->排队->执行命令->返回结果,慢查询通常发生在执行命令阶段,可以通过日志查询系统slowlog进行问题定位跟踪

在配置文件中redis.conf设置:

  • slowlog-max-len:表示慢查询最大的条数,默认128,保存在内存中,当超过预先设置的值后会将最早的slowlog删除,是个先进先出队列
  • slow-log-slower-than:慢查询阀值,默认10000微妙,只有命令执行时间大于该阀值才会被slowlog记录,如果记录所有命令将阀值设置为0

Redis是每秒万级别,设置阀值时候,默认为10000微秒(10毫秒),不要设置太大,建议1毫秒之下,才有意义。定期将慢查询持久化到其他数据库,便于排查。

慢查询命令:

  • slowlog get [n]:获取慢查询队列
  • slowlog len:获取慢查询队列长度
  • slowlog reset:清空慢查询队列

pipeline

核心1次网络请求处理n条命令,redis本身命令处理时间是微秒级别,pipeline主要减少网络传输要求

发布订阅

角色:发布者(publisher)/订阅者(subscriber)/频道(channel):

  • 发布消息publish channel message
  • 订阅消息 subscribe [channel],可以订阅多个频道
  • 取消订阅 unsubscribe [channel]

BitMaps(位图)

CEO

用于地理位置定位,基于zset实现的:

  • geoadd key longitude latitude member增加地理位置信息,例如:geoadd cities: 163.31 39.99 beijing
  • geopos key member [member...],获取地理位置消息,例如geopos cities: beijing
  • geodist key member1 member2 [unit]:获取两地址位置距离,unit为单位(m/km/mi/ft),例如:geodist cities: beijing shanghai km

主从复制

单机带来的问题机器故障、容量限制、QPS瓶颈,主从复制是一种一主多从的模式提供了数据副本,解决了单机带来的机器故障问题,另外主从分离模式还提供了Redis读的性能,也是高可用,分布式的基础。

所谓的主从复制就是一个Redis主节点拥有多个从节点,由主节点的数据单向复制到从节点,在一些读多写少的业务场景非常受用

数据持久化

Redis数据存储都是内存里,对数据的更新异步的存储在磁盘里,在Redis中的数据持久化有两种策略,RDB快照,AOF日历

RDB

把当前内存中的数据集快照写入磁盘,恢复时将快照文件直接读到内存里。

触发机制

save和bgsave生成快照

save:使用save会造成客户端阻塞,它使用一种同步的方式生成RDB快照文件,因为Redis是单线程,如果save过程很长也会阻塞其他客户端的命令,在生产中是不建议使用的

bgsave:使用bgsave Redis会fork一个子进程来负责生成RDB文件,由于bgsave是异步进行的并不会阻塞其他客户端的命令,bgsave模式下阻塞阶段发生在fork过程中

持久化配置

其中stop-writes-on-bgsave-error这个配置很重要,如果子进程(也就是备用经常)在后台生成快照失败,主经常会停止新的写入操作,也是为了保持数据一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 持久化默认时间策略
save 900 1 # 如果仅有1-9次更改操作,那么要900s才写入硬盘一次
save 300 10 # 如果仅有10-9999次更改操作,那么要300s才写入硬盘一次
save 60 10000 # 如果超过10000次更改操作,那么60s才会写入硬盘一次

# RBD 文件名称,建议 dump-6379.rdb
dbfilename dump.rdb

# 工作目录(上面 dump-6379.rdb 文件保存目录)
dir /data/soft/redis-5.0.5/data/

# 备份进程出错,主进程停止写入
stop-writes-on-bgsave-error yes

# 是否压缩
rdbcompression yes

# 导入时是否检查
rdbchecksum yes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
127.0.0.1:6379> save
OK
127.0.0.1:6379> bgsave
Background saving started

# 服务端
[35856] 02 Jul 10:01:02.999 * 1 changes in 3600 seconds. Saving...
[35856] 02 Jul 10:01:03.063 * Background saving started by pid 58172
[35856] 02 Jul 10:01:03.263 # fork operation complete
[35856] 02 Jul 10:01:03.263 * Background saving terminated with success
[35856] 02 Jul 10:30:56.240 * DB saved on disk
[35856] 02 Jul 10:31:03.565 * Background saving started by pid 53904
[35856] 02 Jul 10:31:03.705 # fork operation complete
[35856] 02 Jul 10:31:03.705 * Background saving terminated with success
不足

性能、时间耗时,存在不稳定性

RDB生成的过程就是将Redis内存中的dump到硬盘生成一个RDB文件,其实就是生成一个内存快照,save会造成阻塞,不建议生产环境去使用。besave的fork也是一个重量级操作,遵循copy-on-write(写入时复制)策略,新fork出的子进程会继续共享父进程的物理空间,使用COW技术可以避免不必要的资源分配,父进程的代码段和只读数据段都不允许修改,所以无需复制,当父进程处理些请求时会把要修改的页创建副本,而子进程在fork过冲中会共享父进程的内存快照

Copy on Write技术实现原理:fork之后,kernel把父进程中所有的内存页的权限都设为read-only,然后子进程的地址空间指向父进程。当父子进程都只读内存时,相安无事。当其中某个进程写入内存时,CPU硬件检测到内存页是read-only,于是触发页异常中间(page-fault),陷入kernel的一个终端例程。中断例程中,kernal就会把触发的异常的页复制一份,于是分子进程各自持有独立的一份

COW技术可以减少分配和复制大量资源时带来的瞬间延迟,减少不必要的资源分配。

如果在fork之后,父子进程还需要进行读写操作,那么会产生大量的分页错误(页异常中断page-fault),这样就得不偿失了

Redis会根据自定义时间策略或者定时任务手动执行besave进行快照备份,如果某个出现宕机会丢失部分数据

AOF

以写日志的方式执行redis命令后,将数据写入AOF日志文件

可靠性fsync

redis命令写入过程,是先写入硬盘的缓冲区,缓冲区根据选择的策略写入到系统中,linux的glibc提供了fsync(init fd)函数可以将指定文件的内容强制从内核写入至磁盘,只要redis进程实时调用了fsync函数就可以保证AOF日志不丢失,由于fsync是一个磁盘I/O操作,所以不能每条redis指令都执行一次fsync,这样redis高性能就没有保证,有三种策略,默认推荐everysec策略:

  • always:每条命令都会写入AOF中,保证数据不会丢失,但是I/O开销会很大
  • everysec:以每秒钟为单位将缓冲区中的数据写入到硬盘,如果出现故障可能会丢失1秒钟的数据,这个也是Redis的默认值
  • no:这个策略根据操作系统定义的进行写入,不需要操作,但同时也是不可控的。
重写

将那些过期、重复的命令进行压缩减少,从而达到减少硬盘占用量,提高数据恢复速度

实现方式:

  • bgrewriteaof:类似于RDB中的bgsave
  • auto-aof-rewrite-min-size:配置AOF重写需要的最小尺寸
  • auto-aof-rewrite-percentage:配置AOF文件增长率

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 是否开启 aof
appendonly yes

# 文件名称
appendfilename "appendonly-6379.aof"

# 同步方式
appendfsync everysec

# aof 重写期间是否同步
no-appendfsync-on-rewrite no

# 重写触发配置
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

# 加载 aof 时如果有错如何处理
aof-load-truncated yes

# 文件重写策略
aof-rewrite-incremental-fsync yes
数据损坏修复

“在过去曾经发现一些很罕见的 BUG 导致使用 AOF 重建的数据跟原数据不一致的问题。” 这句话来自 Redis 实践官方,为了应对这种罕见的 BUG 可以使用 redis-check-aof 命令修复原始的 AOF 文件

1
redis-check-aof --fix appendonly-6379.aof # appendonly-6379.aof 对应你的 aof 日志文件

选择

  • 重放优先级:系统重启时优先重放AOF备份数据,随后是RDB,因为从数据备份的完整性考虑,AOF相比RDB可靠性更高些
  • 恢复速度快:RDB采用二进制方式存储占用体积小,AOF是以日志形式存储,体积相比RDB要大,相比较来说,RDB的数据恢复速度要高于AOF
  • 数据安全性:RDB采用快照形式,在一定时间内会丢失数据,AOF相对更安全些,主要有三种策略。

一般来说,应该同时使用两种持久化功能,如果可以承受数分钟以内的数据丢失,可以只使用RDB持久化。很多用户都只使用AOF持久化,而定时生成RDB非常便于进行数据库备份,并且RDB恢复数据集的速度也要比AOF恢复的速度要快,除此之外,使用RDB还可以避免之前说过的AOFbug。

哨兵高可用

主从复制如果主节点发生故障,Redis Sentinel功能可以自动提升而不是人工干预

Sentinel是一个分布式系统,类似于Cousul集群,一般由3~5个节点组成,使用Raft算法实现领导者选举因为故障转移只需要一个Sentinel节点来完成

为什么Redis只适用于缓存而不能当做数据库来使用?

  • 性能:Redis是一个基于内存的数据库,通常用来计数器、Session存储、缓存设计等等
  • 成本:假设百万条数据,仅1%是热点数据其余都是冷数据,这种情况全部都存在Redis里面,是资源的浪费,从缓存设计角度来说,所保存的也仅是热点数据
  • 灵活性:Redis的数据结构是丰富的,支持String,HashTable,List,Set,Zset还有最新的BitMaps.GEO等,如果有很复杂的查询语句,关联查询等,还是SQL等数据库更加合适
  • 数据可靠性:第一种方式是Redis+RDB,如果发生断电,自上次持久化到最次一次故障之间的数据丢失。第二种是Redis+AOF,AOF有三种策略将数据持久化到磁盘,其中everysec是相对折中的一种方案,everysec折中方案也会存在1秒钟数据丢失的问题。

实践1——计数器实现并发场景下的优惠券领取功能

计数器在实际中有很多需求:PV/UV、接口并发限制、抽奖、优惠券领取等

业务背景

业务需求方做优惠券发放活动,共优惠券10张,参与用户100人,先到先得,假设每次并发20用户同时访问,如何保证不超领取

相关命令

  • exists:判断指定key是否存在
  • setnx:设置值,若值存在不做任何处理
  • incr:计数

每发送一次领取请求,采用incr命令进行自增,由于redis单线程的原因,可以保证原子性,不会出现超领

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// luck.js
const Redis = require('redis');
const redis = new Redis(6379,'127.0.0.1');

// 将日志写入指定文件
const fs = require('fs');
const {Console} = require('console');
const output = fs.createWriteStream('./stdout.js')
const errorOutput fs.createWriteStream('./stderr.js')
const logger = new Console(output,errorOutput);

async function luck(){
const count = 10;
const key = 'counter:luck';
const keyExists = await redis.exists(key);

if(!keyExists){
// 如果key不存在初始化设置
await redis.setnx(key,0);
}

// 增加领取数量
const result = await redis.incr(key);

if(result > count){
// 优惠券领取超限
logger.error('luck failure',result);
return;
}

logger.info('luck success',result);
}
module.export = luck;

// app.js
const http = require('http');
const luck = require('./luck');

http.createServer((req,res)=>{
if(req.url === 'luck'){
luck();

res.end('ok')
}
}).listen(3000)

实践2——Redis Lua脚本

Lua是一种轻量小巧的脚本语言,用标准c语言编写并以源代码形式开放,设计目的是为嵌入应用程序中,从而为应用程序提供灵活的扩张和定制功能。由于Lua语言具有原子性,其在执行过程中不会被其他程序打断,对于并发下的数据一致性有一定的帮助

两种Lua脚本

Redis支持两种运行Lua脚本,一种是直接Redis中输入Lua代码,适用于一些简单的脚本,另一种方式是编写Lua脚本文件,Redis使用SHA1算法支持对脚本签名和Script Load 预先缓存,需要运行的时候通过签名返回的标识符即可。

EVAL

通过内置的Lua解释器,可以使用EVAL命令对Lua脚本进行求值

  • script:指定脚本
  • numkeys:指定键名餐参数个数
  • key:键名,可以多个key,通过KEYS[1]KEYS[2]的形式访问
  • atg:键值,可以多个val,通ARGS[1]ARGS[2]的形式访问
1
2
3
4
5
6
7
EVAL script numkeys key[key...] arg[arg...]
# 通过KEYS[] 数组的形式访问ARGV[],下标是从1开始,KEYS[1]对应的键名为name1,ARGV[2]对应的值为val2
127.0.0.1:6379> EVAL "return redis.call('SET',KEYS[1],ARGV[2])" 2 name1 name2 va
l1 val2
OK
127.0.0.1:6379> get name1
"val2"

redis.call何redis.pcall是两个不同的Lua函数来调用redis命令,区别是如果redis命令中出现错误异常,redis.call会直接返回一个错误信息给调用者,而redis.pcall会以Lua的形式对错误进行捕获并返回

1
2
3
4
5
6
7
8
9
# 这里执行了两条 Redis 命令,第一条故意写了一个 SET_ 这是一个错误的命令,可以看到出错后,错误信息被抛出给了调用者,同时你执行 get name2 会得到 (nil),第二条命令也没有被执行
# redis.call
127.0.0.1:6379> EVAL "redis.call('SET_', KEYS[1], ARGV[2]); redis.call('SET', KEYS[2], ARGV[3])" 2 name1 name2 val1 val2 val3
(error) ERR Error running script (call to f_bf814e38e3d98242ae0c62791fa299f04e757a7d): @user_script:1: @user_script: 1: Unknown Redis command called from Lua script

# redis.pcall
# 一样的操作,可以看到输出结果为nil,它的错误被Lua捕获了,在执行get name2 会得到一个设置好的结果val3,这里第二条命令是被执行了的
EVAL "redis.pcall('SET_', KEYS[1], ARGV[2]); redis.pcall('SET', KEYS[2], ARGV[3])" 2 name1 name2 val1 val2 val3
(nil)
Nodejs实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ioredis 支持所有脚本命令,比如EVAL/EVALSHA/SCRIPT,在现实场景中使用它是很繁琐的,要注意脚本缓存,并检测何时使用EVAL、EVALSHA.ioredis公开了一个defineCommand方法,使得脚本更容易使用
const Redis = require('ioredis');
const redis = new Redis(6379,'127.0.0.1');

const evalScript = "return redis.call('SET',KEYS[1],ARGV[2])";
redis.defineCommand('evalTest',{
numberOfKeys:2,
lua:evalScript
});

async function eval(){
await redis.evalTest('name1','name2','val1','val2');
const result = await redis.get('name1');
console.log(result) // val2
}

eval();

EVALSHA

EVAL命令要求每次执行脚本的时候都发送一次脚本主体script body。Redis有一个内部缓存机制,因此它不会每次都重新编译脚本,通过EVALSHA来实现,根据给定的SHA1校验码,对缓存在服务器中的脚本进行求值

  • SCRIPT FLUSH:清除所有脚本缓存
  • SCRIPT EXISTS:检查指定的脚本是否存在于脚本缓存
  • SCRIPT LOAD:讲一个脚本转入脚本缓存,但不立即运行它
  • SCRIPT KILL:杀死当前正在运行的脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
EVALSHA sha1 numkeys key [key...] arg[arg...]

# 载入脚本缓存
127.0.0.1:6379> SCRIPT LOAD "redis.pcall('SET',KEYS[1],ARGV[2])"
"3b2c59b4e98a20a3a9e631fb586218e7b70f1020"

# 生成的SHA-1签名的标识字符串用于执行之后的脚本
127.0.0.1:6379> EVALSHA 3b2c59b4e98a20a3a9e631fb586218e7b70f1020 2 name11 name12
val11 val12
(nil)

127.0.0.1:6379> get name11
"val12"
nodejs实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
const Redis = require('ioredis');
const redis = new Redis(6379,'127.0.0.1');

const evalScript = "return redis.call('SET',KEY1[1],ARGV[2])";

async function evalSHA(){
// 1.缓存脚本获取sha1值
const sha1 = await redis.script('load',evalScript);
// 2.通过evalsha执行脚本
await redis.evalsha(sha1,2,'name1','name2','val1','val2');

// 3.获取数据
const result = await redis.get('name1');
console.log(result); // val2
}

evalSHA();

Lua脚本文件

有逻辑运算的脚本,可以编写Lua脚本文件

下面是一个测试代码,通过读取两个值比较返回不同的值,通过lua脚本实现后可以多条redis命令原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
---test.lua
-- 先SET
redis.call('SET',KEYS[1],ARGV[1])
redis.call('SET',KEYS[2],ARGV[2])

-- GET 取值
local key1 = tonumber(redis.call('GET',KEYS[1]));
local key2 = tonumber(redis.call('GET',KEYS[2]));

-- 如果key1 小于 key2返回0 nil相当于false
if(key1 == nil or key == nil or key1 < key2)
then return 0
else return 1
end
nodejs加载lua脚本文件
1
2
3
4
5
6
7
8
9
10
11
12
const Redis = require('ioredis');
const redis = new Redis(6379,'127.0.0.1')
const fs = require('fs')

async function test(){
const redisLuaScript = fs.readFileSync('./test.lua');
const result1 = await redis.eval(redisLuaScript,2,'name1','nam2',20,10);
const result2 = await redis.eval(redisLuaScript,2,'name1','nam2',10,20);
console.log(result1,result2); // 1 0
}

test();

实践3——Redis分布式锁

线程锁:单线程编程模式下请求是顺序的,一个好处不需要考虑线程安全、资源竞争问题

进程锁:一个服务部署于一台服务器,同时开启多个进程,nodejs中为了利用操作系统资源,根据CPU的核心数可以开启多程模式,这个时候如果对于一个共享资源操作还是会遇到资源竞争的问题,另外每一个进程都是相互独立,拥有自己独立的内存空间,关于进程锁通过java的synchronized也很难解决,它仅局限在同一个JVM中有效

分布式锁:一个服务无论是单线程还是多进程模式,当多机部署、处于分布式环境下对统一共享资源进行操作还是会面临同样的问题。由于先读数据在通过业务逻辑修改之后进行SET操作,这并不是一个原子操作,当多个客户端对同一个资源进行先读后写操作就会引发并发问题,这个时候就需要分布式锁去解决。

实现思路

实现分布式锁的方式:数据库、redis、zookeeper,通过redis来实现一个分布式锁,保持三个特性:安全性、死锁、容错

安全性:上锁,在任意时刻保持仅有一个客户端持有该锁

死锁:造成死锁可能是由于某种原因,本该释放的锁没有被释放,因此在上锁的时候可以同步设置过期时间,由于客户端自己的原因没有被释放,也要保证锁能够自动释放

容错:容错是在多节点的模式下需要考虑的,只要保证N/2+1节点可用,客户端就可以成功获取、释放锁

Redis但实力分布式锁实现

在Redis的单节点实例下实现一个简单的分布式锁,会借助简单lua脚本来实现原子性

上锁

通过setnx命令占坑,为了防止死锁,通常在占坑之后还会设置一个过期时间expire:

1
2
setnx key value
expire key seconds

上面的操作不是一个原子性操作,原子性操作是指命令在执行过程中并不会被其他线程或者请求打断,如果setnx执行成功之后,出现网络闪断expire命令便不会得到执行,会导致死锁出现。

通过:

1
set key value [EX seconds] [PX milliseconds] [NX|XX]
释放锁

释放锁的过程是将原来占有的坑删除,加锁的过程把value设置为一个随机值,在del key之前先判断这个key存在且value等于自己制定的值才执行删除操作,判断和删除不是一个原子性操作,借助Lua脚本实现

1
2
3
4
5
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

Node实践

初始化自定义redisLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class RedisLock {
/**
* 初始化 RedisLock
* @param {*} client
* @param {*} option
*/

constructor(client, options = {}) {
if (!client) {
throw new Error('client 不存在')
}

if (client.status !== 'connecting') {
throw new Error('client 未正常链接')
}

this.lockLeaseTime = options.lockLeaseTime || 2; // 默认所过期时间 2 秒
this.lockTimeout = options.lockTimeout || 5; // 默认所过期时间 5 秒
this.expireMode = options.expireMode || 'EX';
this.setMode = options.setMode || 'NX';
this.client = client;
}

/**
* 上锁,通过set 命令传入setnx/expire拓展参数,
* 上锁成功返回,上锁失败进行重试,
* 在lockTimeout 指定时间内仍未获取到锁,则获取锁失败
* @param {*} key
* @param {*} val
* @param {*} expire
*/
async lock(key, val, expire) {
const start = Date.now();
const self = this;

return (async function intraneLock() {
try {
const result = await self.client.set(key, val, self.expireMode, expire || self.lockLeaseTime, self.setMode);

// 上锁成功
if (result === 'OK') {
console.log(`${key} ${val} 上锁成功`);
return true;
}

// 锁超时
if (Math.floor((Date.now() - start) / 1000) > self.lockLeaseTime) {
console.log(`${key} ${val} 上锁重试超时结束`)
return false;
}

// 循环等待重试
console.log(`${key} ${val} 等待重试`)
await sleep(3000);
console.log(`${key} ${val} 开始重试`)
return intraneLock();
} catch (err) {
throw new Error(err);
}
})();
}

/**
* 释放锁,通过redis.eval(script)执行自定义redis lua脚本
* @param {*} key
* @param {*} val
*/
async unLock(key, val) {
const self = this;
const script = `
if redis.call('get',KEYS[1] == ARGV[1]) then
return redis.call('del',KEYS[1])
else
return 0
end
`;
try {
const result = await self.client.eval(script, 1, key, val);
if (result === 1) {
return true;
}
return false;
} catch (err) {
throw new Error(err);
}
}
}
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const Redis = require('ioredis');
const redis = new Redis(6379, '127.0.0.1');
const uuidv1 = require('uuid/v1');
const RedisLock = require('./redisLock');

const redisLock = new RedisLock(redis);

function sleep(time) {
return new Promise((resolve) => {
setTimeout(function () {
resolve();
}, time || 1000)
})
}

async function test(key) {
try {
const id = uuidv1();
await redisLock.lock(key, id, 20);
await sleep(3000);

const unLock = await redisLock.unLock(key, id);
console.log('unLock: ', key, id, unlock);
} catch (err) {
console.log('上锁失败', err);
}
}

test('name1');
test('name1')

同时调用了两次test方法进行上锁,只有第一次是成功的,第二次name1 上锁的时候发现key=name1已被占坑,开始重试,由于以上的测试中设置了3秒之后自动释放,name1 在经过两次重试之后上锁成功

1
2
3
4
5
6
7
8
name1 26e00260-0532-11ea-b978-2160dffafa30 上锁成功
name1 26e02970-0532-11ea-b978-2160dffafa30 等待重试
name1 26e02970-0532-11ea-b978-2160dffafa30 开始重试
name1 26e02970-0532-11ea-b978-2160dffafa30 等待重试
unLock: name1 26e00260-0532-11ea-b978-2160dffafa30 true
name1 26e02970-0532-11ea-b978-2160dffafa30 开始重试
name1 26e02970-0532-11ea-b978-2160dffafa30 上锁成功
unLock: name1 26e02970-0532-11ea-b978-2160dffafa30 true
Redlock算法

在Sentinel/Redis Cluster下,如果客户端A在主节点获取到锁之后,主节点还未来得及同步信息到从节点就挂掉了,这时候Sentinel 会选举另外一个从节点作为主节点,那么客户端B此时也申请相同的锁,就会出现同一个锁被多个客户端持有,对数据的最终一致性有很高的要求还不行。

鉴于这些问题,官方提供了一个使用Redis实现分布式锁的规范算法Redlock,在单实例或者多实例提供强有力的保障,本身具备容错能力,会从N个实例使用相同的key、随机值尝试set ket value [EX seconds] [PX milliseconds] [NX|XX]命令去获取所,在有效时间内至少 N/2+1个Redis实例取到所,否则取锁失败,失败情况下客户端应该在所有Redis实例上进行解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const Redis = require('ioredis');
const client1 = new Redis(6379, '127.0.0.1');
const RedLock = require('redlock');

const redlock = new RedLock([client1], {
retryDelay: 200, // time in ms
retryCount: 5
})

// 多个Redis 实例
// const redlock = new RedLock(
// [new Redis(6379, '127.0.0.1')],
// [new Redis(6379, '127.0.0.2')],
// [new Redis(6379, '127.0.0.3')]
// )

async function test(key, ttl, client) {
try {
const loclk = await redlock.lock(key, ttl);
console.log(client, lock, value)
} catch (err) {
console.log(client, err);
}
}

test('name1', 10000, 'client1');
test('name1', 10000, 'client2');

对同一个key name1 两次上锁,由于client1 先取到了锁,client2 无法获取锁,重试5次之后报错:LockError:Exceeded 5 attempts to lock the resource 'name1'

如何让秒杀、活动倒计时更加精确

分析一下从获取服务器时间到前端显示倒计时的过程:

  1. 客户端http请求服务器时间
  2. 服务器响应完成
  3. 服务器通过网络传输时间到客户端
  4. 客户端根据活动开始时间和服务器时间差做倒计时显示

服务器响应完成的时间就是服务器时间,但是经过网络传输就会有误差,由网络环境决定误差大小,一般是几十ms或者几百ms。

可得出:当前服务器时间=服务器系统返回时间+网络传输时间+前端渲染时间+常量(可选)

前端渲染时间,避免不同浏览器渲染快慢差异差异造成明显的时间不同步,可以在服务器返回当前时间和本地签单的时间的差值得出

获得服务器时间后,前端进入倒计时计算和计时器显示,要考虑js代码和线程阻塞造成计时器延时问题,思路是通过引入计数器,判断计时器延迟执行的时间来调整,尽量让误差缩小,不同浏览器不同时间段打开页面倒计时误差可控制在1s以内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 线程占用
setInterval(()=>{
var j = 0;
while(j++ < 100000000);
},0)

// 倒计时
let interval = 1000,
ms = 50000, //从服务器和活动开始时间计算出的时间差,测试用50000ms
count=0,
startTime = new Date().getTime(),
timeCounter = null;
if(ms >=0){
timeCounter = setTimeout(countDownStart,interval)
}
function countDownStart(){
count++;
const offset = new Date().getTime() - (startTime+count*interval);
let nextTime = interval -offset;
const daytohour = 0;
if(nextTime < 0){
nextTime = 0;
}
ms-=interval;
console.log(`误差:${offset}ms,下一次执行:${nextTime}ms后,离活动结束还有:${ms}ms`)

if(ms < 0){
clearTimer(timeCount)
}else{
timeCounter = setTimeout(countDownStart,nextTime),
}
}
0%