介绍
tproxy 是一款代理支持多种协议的代理程序,并监控查看协议内容,收集常规的通信指标如tcp连接数等,支持常见的 http2,mqtt,mongo,grpc,redis
快速图解
代码分析
启动
直接从主入口函数main开始看起
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main
import (
"flag"
"fmt"
"os"
"github.com/fatih/color"
)
var settings Settings
func main() {
//解析命令行参数,并校验参数
//启动服务
if err := startListener(); err != nil {
fmt.Fprintln(os.Stderr, color.HiRedString("[x] Failed to start listener: %v", err))
os.Exit(1)
}
}
这其中关于flag包较为完整的用法还是值得借鉴的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var (
//定义 flag 变量
)
//当输入flag数量不正确时将默认的启动参数与用法进行打印输出,直接返回,提醒用户缺少参数输入
if len(os.Args) < 1 {
flag.Usage()
return
}
flag.Parse()
if len(settings.Remote) == 0 {
fmt.Fprintln(os.Stderr, color.HiRedString("[x] Remote target required"))
//在出错的时候将默认的启动参数与用法进行打印输出,并错误退出,提示用户虽然提供了参数,但是内容不符合要求
flag.PrintDefaults()
os.Exit(1)
}
startListener 建立连接对,并启动连接监视器
监视器
主要记录tcp连接信息如下
- 总计处理过的连接数
- 当前并发连接数
- 历史最大并发连接数
- 连接开始创建的时间
- 连接存活最长的时间
处理客户端请求
接收客户端链接,建立 客户端<->tproxy<->服务端 的连接对,并设置 MultiWriter,MultiReader,将客户端数据与服务端数据发送至io.Pipe 的内存管道,提供给Dump对象,进行打印,同时对 客户端连接 与 服务端连接包装上限流和延迟发送的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func startListener() error {
//创建并启动tcp连接计数器
stat = NewStater(NewConnCounter(), NewStatPrinter(statInterval))
go stat.Start()
conn, err := net.Listen("tcp", fmt.Sprintf("%s:%d", settings.LocalHost, settings.LocalPort))
if err != nil {
return fmt.Errorf("failed to start listener: %w", err)
}
//defer 紧跟在conn成功创建之后,很好的习惯
defer conn.Close()
display.PrintfWithTime("Listening on %s...\n", conn.Addr().String())
var connIndex int
for {
// accept 客户端连接
cliConn, err := conn.Accept()
if err != nil {
return fmt.Errorf("server: accept: %w", err)
}
connIndex++
display.PrintlnWithTime(color.HiGreenString("[%d] Accepted from: %s",
connIndex, cliConn.RemoteAddr()))
//创建连接对,并使用协程处理连接数据
pconn := NewPairedConnection(connIndex, cliConn)
go pconn.process()
}
}
处理Crtl+C信号的代码
接收 Crtl+C 信号,优雅的停止信号 挺有意思的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
go func() {
//设置接收系统 signal Ctrl+C / kill -15 信号
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
// 使用 range chan 阻塞等待 信号进入 ,个人感觉 直接 sig <- c 可能更好
for sig := range c {
// 停止接收 signal 信号
signal.Stop(c)
//执行程序停止前的处理
stat.Stop()
//找到当前进程
p, err := os.FindProcess(syscall.Getpid())
if err != nil {
fmt.Println(err)
os.Exit(0)
}
//将signal信号 重新发送给当前进程 用以关闭当前程序,不用担心 再次接收到 signal信号,毕竟已经使用 signal.Stop(c) 停止接收signal信号了
if err := p.Signal(sig); err != nil {
fmt.Println(err)
}
}
}()
连接对
接下来看项目中最重要的代理功能的实现
连接对结构
1
2
3
4
5
6
7
8
9
10
11
type PairedConnection struct {
//connection id
id int
// 下游服务连接
cliConn net.Conn
// 上游服务连接
svrConn net.Conn
once sync.Once
//关闭 chan
stopChan chan struct{}
}
连接对创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// NewPairedConnection 生成 , 设置 客户端连接 ID,客户端连接,还有 停止通道
func NewPairedConnection(id int, cliConn net.Conn) *PairedConnection {
return &PairedConnection{
id: id,
cliConn: cliConn,
stopChan: make(chan struct{}),
}
}
func (c *PairedConnection) process() {
defer c.stop()
// 连接远端服务
conn, err := net.Dial("tcp", settings.Remote)
if err != nil {
display.PrintlnWithTime(color.HiRedString("[x][%d] Couldn't connect to server: %v", c.id, err))
return
}
display.PrintlnWithTime(color.HiGreenString("[%d] Connected to server: %s", c.id, conn.RemoteAddr()))
//调用统计 新增连接方法
stat.AddConn(strconv.Itoa(c.id), conn.(*net.TCPConn))
// 保存 服务端 连接
c.svrConn = conn
// 处理 服务端 数据
go c.handleServerMessage()
// 处理 客户端 数据
c.handleClientMessage()
}
连接对数据流处理
服务端数据处理
1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *PairedConnection) handleServerMessage() {
// server closed also trigger client close.
defer c.stop()
//建立内存读写通道
r, w := io.Pipe()
//构造 客户端连接 与 内存通道 的 多写对象
tee := io.MultiWriter(newDelayedWriter(c.cliConn, settings.Delay, c.stopChan), w)
//根据设置的数据协议,使用对应协议解析 内存读通道中 服务端写入的响应数据
go protocol.CreateInterop(settings.Protocol).Dump(r, protocol.ServerSide, c.id, settings.Quiet)
// 将 服务端上游 返回数据 输出到 多写对象
c.copyDataWithRateLimit(tee, c.svrConn, protocol.ServerSide, settings.DownLimit)
}
客户端数据处理
1
2
3
4
5
6
7
8
9
10
func (c *PairedConnection) handleClientMessage() {
// client closed also trigger server close.
defer c.stop()
r, w := io.Pipe()
tee := io.MultiWriter(c.svrConn, w)
//根据设置的数据协议,使用对应协议解析 内存读通道中 客户端写入的请求数据
go protocol.CreateInterop(settings.Protocol).Dump(r, protocol.ClientSide, c.id, settings.Quiet)
c.copyDataWithRateLimit(tee, c.cliConn, protocol.ClientSide, settings.UpLimit)
}
连接对关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (c *PairedConnection) stop() {
// 使用 once 保证 stop 资源回收只会执行一次
c.once.Do(func() {
close(c.stopChan)
stat.DelConn(strconv.Itoa(c.id))
if c.cliConn != nil {
display.PrintlnWithTime(color.HiBlueString("[%d] Client connection closed", c.id))
c.cliConn.Close()
}
if c.svrConn != nil {
display.PrintlnWithTime(color.HiBlueString("[%d] Server connection closed", c.id))
c.svrConn.Close()
}
})
}
代码细节
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (c *PairedConnection) copyData(dst io.Writer, src io.Reader, tag string) {
_, e := io.Copy(dst, src)
//但是此处的判断 e != io.EOF 个人感觉是不需要的,因为 io.Copy 函数明确说明不会返回io.EOF 的错误,在判断 错误类型是非为 net操作类错误,如非 net 操作类错误,不做处理(这个合理吗)我去每个没吃透的点都是以后埋的坑,此处对 io.Copy 中如 io.Writer 或 io.Reader 被手动或其他原因意外关闭,则进行忽略,结束当前 Copy 即可
//https://golang-nuts.narkive.com/ae58tX8v/use-of-closed-network-connection-error-from-tcpconn-read
// useOfClosedConn 代表了 当前操作的 io.Writer 或 io.Reader 被关闭
if e != nil && e != io.EOF {
netOpError, ok := e.(*net.OpError)
if ok && netOpError.Err.Error() != useOfClosedConn {
//error 库的 使用 ?
reason := netOpError.Unwrap().Error()
display.PrintlnWithTime(color.HiRedString("[%d] %s error, %s", c.id, tag, reason))
}
}
}
//ErrFileClosing is returned when a file descriptor is used after it has been closed.
// go io.Copy的api说明
// Copy copies from src to dst until either EOF is reached
// on src or an error occurs. It returns the number of bytes
// copied and the first error encountered while copying, if any.
//
// A successful Copy returns err == nil, not err == EOF.
// Because Copy is defined to read from src until EOF, it does
// not treat an EOF from Read as an error to be reported.
//
// If src implements [WriterTo],
// the copy is implemented by calling src.WriteTo(dst).
// Otherwise, if dst implements [ReaderFrom],
// the copy is implemented by calling dst.ReadFrom(src).
func Copy(dst Writer, src Reader) (written int64, err error) {
return copyBuffer(dst, src, nil)
贡献说明
在代码细节中说的 可以不用判断 io.Copy 的 io.EOF 的判断被项目作者merge了,😄
如果你看不到评论,那么就真的看不到评论w(゜Д゜)w