pulse

package module
v0.0.0-...-e31dbf6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

README

pulse

CI codecov Go Report Card Go Reference License

极简异步io库,速度很快,非常简单

特性

  • 🚀 高性能:基于epoll/kqueue的事件驱动架构
  • 🎯 极简API:只需实现OnOpen、OnData、OnClose三个回调
  • 🔄 多任务模式:支持事件循环、协程池、独占协程三种处理模式
  • 🛡️ 并发安全:内置连接管理和状态隔离
  • 🌐 跨平台:支持Linux、macOS,Windows开发中

支持平台

  • Linux
  • macOS
  • Windows (TODO - 开发中)

安装

go get github.com/antlabs/pulse

快速开始

Echo服务器
package main

import (
    "context"
    "log"
    
    "github.com/antlabs/pulse"
)

type EchoHandler struct{}

func (h *EchoHandler) OnOpen(c *pulse.Conn) {
    if err != nil {
        log.Printf("连接失败: %v", err)
        return
    }
    log.Println("客户端连接成功")
}

func (h *EchoHandler) OnData(c *pulse.Conn, data []byte) {
    // 回显收到的数据
    c.Write(data)
}

func (h *EchoHandler) OnClose(c *pulse.Conn, err error) {
    log.Println("连接关闭")
}

func main() {
    server, err := pulse.NewMultiEventLoop(
        context.Background(),
        pulse.WithCallback(&EchoHandler{}),
        pulse.WithTaskType(pulse.TaskTypeInEventLoop),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    log.Println("服务器启动在 :8080")
    server.ListenAndServe(":8080")
}
自定义协议解析
package main

import (
    "context"
    "log"
    
    "github.com/antlabs/pulse"
)

type ProtocolHandler struct{}

func (h *ProtocolHandler) OnOpen(c *pulse.Conn) {
    if err != nil {
        return
    }
    // 为每个连接初始化解析状态
    c.SetSession(make([]byte, 0))
}

func (h *ProtocolHandler) OnData(c *pulse.Conn, data []byte) {
    // 获取连接的解析缓冲区
    buffer := c.GetSession().([]byte)
    
    // 将新数据追加到缓冲区
    buffer = append(buffer, data...)
    
    // 解析完整消息
    for len(buffer) >= 4 { // 假设消息长度前缀为4字节
        msgLen := int(buffer[0])<<24 | int(buffer[1])<<16 | int(buffer[2])<<8 | int(buffer[3])
        if len(buffer) < 4+msgLen {
            break // 消息不完整,等待更多数据
        }
        
        // 提取完整消息
        message := buffer[4 : 4+msgLen]
        log.Printf("收到消息: %s", string(message))
        
        // 处理消息...
        
        // 移除已处理的消息
        buffer = buffer[4+msgLen:]
    }
    
    // 更新连接的缓冲区
    c.SetSession(buffer)
}

func (h *ProtocolHandler) OnClose(c *pulse.Conn, err error) {
    log.Println("连接关闭")
}

func main() {
    server, err := pulse.NewMultiEventLoop(
        context.Background(),
        pulse.WithCallback(&ProtocolHandler{}),
        pulse.WithTaskType(pulse.TaskTypeInEventLoop),
    )
    if err != nil {
        log.Fatal(err)
    }
    
    server.ListenAndServe(":8080")
}

客户端事件循环示例

package main

import (
	"context"
	"fmt"
	"net"
	"github.com/antlabs/pulse"
)

type MyClientHandler struct{}

func (h *MyClientHandler) OnOpen(c *pulse.Conn) {
	if err != nil {
		fmt.Println("连接失败:", err)
		return
	}
	fmt.Println("连接成功")
	c.Write([]byte("hello server!"))
}

func (h *MyClientHandler) OnData(c *pulse.Conn, data []byte) {
	fmt.Println("收到数据:", string(data))
}

func (h *MyClientHandler) OnClose(c *pulse.Conn, err error) {
	fmt.Println("连接关闭", err)
}

func main() {
	conn1, err := net.Dial("tcp", "127.0.0.1:8080")
	if err != nil {
		panic(err)
	}
	conn2, err := net.Dial("tcp", "127.0.0.1:8081")
	if err != nil {
		panic(err)
	}
	loop := pulse.NewClientEventLoop(
		context.Background(),
		pulse.WithCallback(&MyClientHandler{}),
	)
	loop.RegisterConn(conn1)
	loop.RegisterConn(conn2)
	loop.Serve()
}

主要概念

回调接口
type Callback[T any] interface {
    OnOpen(c *Conn)  // 连接建立时调用
    OnData(c *Conn, data []byte)     // 接收到数据时调用  
    OnClose(c *Conn, err error) // 连接关闭时调用
}
任务处理模式
// 在事件循环中处理(推荐,性能最好, redis和nginx场景)
pulse.WithTaskType(pulse.TaskTypeInEventLoop)

// 在协程池中处理(适合CPU密集型任务, 常见业务场景)
pulse.WithTaskType(pulse.TaskTypeInBusinessGoroutine)

// 每个连接独占一个协程(适合阻塞IO)
pulse.WithTaskType(pulse.TaskTypeInConnectionGoroutine)
连接管理
type Conn struct{}

// 写入数据
func (c *Conn) Write(data []byte) (int, error)

// 关闭连接
func (c *Conn) Close()

// 设置会话数据(用于存储连接状态)
func (c *Conn) SetSession(session any)

// 获取会话数据
func (c *Conn) GetSession() any

// 设置超时
func (c *Conn) SetReadDeadline(t time.Time) error
func (c *Conn) SetWriteDeadline(t time.Time) error

配置选项

server, err := pulse.NewMultiEventLoop(
    context.Background(),
    pulse.WithCallback(&handler{}),                    // 设置回调处理器
    pulse.WithTaskType(pulse.TaskTypeInEventLoop),     // 设置任务处理模式
    pulse.WithTriggerType(pulse.TriggerTypeEdge),      // 设置触发模式(边缘/水平)
    pulse.WithEventLoopReadBufferSize(4096),           // 设置读缓冲区大小
    pulse.WithLogLevel(slog.LevelInfo),                // 设置日志级别
)

示例项目

性能测试

# 启动echo服务器
cd example/echo/server && go run server.go

# 使用wrk进行压测
wrk -t12 -c400 -d30s --script=lua/echo.lua http://127.0.0.1:8080

最佳实践

  1. 状态管理:使用 SetSession/GetSession 存储连接级别的状态
  2. 协议解析:推荐使用无状态解析函数,避免全局共享状态
  3. 错误处理:在OnClose中正确处理错误
  4. 内存管理:及时释放大的临时缓冲区,使用连接池复用连接
  5. 并发安全:避免在多个goroutine中同时操作同一个连接

架构设计

应用层 ┌─────────────────────────────────────┐
      │  OnOpen / OnData / OnClose          │
      └─────────────────────────────────────┘
框架层 ┌─────────────────────────────────────┐
      │  Connection Management              │
      │  Task Scheduling                    │  
      │  Event Loop                         │
      └─────────────────────────────────────┘
系统层 ┌─────────────────────────────────────┐
      │  epoll (Linux) / kqueue (macOS)     │
      └─────────────────────────────────────┘

贡献

欢迎提交Issue和Pull Request!

许可证

Apache License 2.0

Documentation

Overview

Copyright 2023-2024 antlabs. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const TriggerTypeEdge = core.TriggerTypeEdge

边缘触发

View Source
const TriggerTypeLevel = core.TriggerTypeLevel

水平触发

Variables

This section is empty.

Functions

func DebugConns

func DebugConns(conns *core.SafeConns[Conn], maxConns int)

debug 函数,用于打印连接的缓冲区使用情况

func WithCallback

func WithCallback(callback Callback) func(*Options)

设置回调函数

func WithEventLoopReadBufferSize

func WithEventLoopReadBufferSize(size int) func(*Options)

设置event loop里面读buffer的大小

func WithFlowBackPressure

func WithFlowBackPressure(enable bool) func(*Options)

设置流量背压机制,当连接的写缓冲区满了,会暂停读取,直到写缓冲区有空闲空间 目前垂直(et)触发模式下会有问题 如果要在et模式实现流量背压机制,就需要自己管理fd的可读/可写状态, 因为内核只会在 不可读->可读 的时候触发事件, 可读但是未读取的时候不会触发事件 可写同理, 可写->不可写 的时候触发事件, 不可写但是未写入的时候不会触发事件 TODO et模式支持下

func WithFlowBackPressureRemoveRead

func WithFlowBackPressureRemoveRead(enable bool) func(*Options)

设置流量背压机制,当连接的写缓冲区满了,会移除读事件,直到写缓冲区有空闲空间 第二种背压机制会比第一种背压机制更高效(lt模式下,et没有实现), 7945hx cpu上,第二种是3.4GB/s的读写 第一种是3.0GB/s的读写 TODO et模式优化下

func WithLogLevel

func WithLogLevel(level slog.Level) func(*Options)

设置日志级别

func WithMaxSocketReadTimes

func WithMaxSocketReadTimes(maxSocketReadTimes int) func(*Options)

单次可读事情,最大读取次数(水平触发模式有效)

func WithTaskType

func WithTaskType(taskType TaskType) func(*Options)

选择task的类型

func WithTriggerType

func WithTriggerType(triggerType core.TriggerType) func(*Options)

设置水平触发还是边缘触发

Types

type Callback

type Callback interface {
	OnOpen(c *Conn)
	OnData(c *Conn, data []byte)
	OnClose(c *Conn, err error)
}

func ToCallback

func ToCallback(onOpen OnOpen, onData OnData, onClose OnClose) Callback

工具函数,回调函数转成callback接口

type ClientEventLoop

type ClientEventLoop struct {
	*MultiEventLoop
	// contains filtered or unexported fields
}

func NewClientEventLoop

func NewClientEventLoop(ctx context.Context, opts ...func(*Options)) *ClientEventLoop

func (*ClientEventLoop) RegisterConn

func (loop *ClientEventLoop) RegisterConn(conn net.Conn) error

func (*ClientEventLoop) Serve

func (loop *ClientEventLoop) Serve()

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func (*Conn) Close

func (c *Conn) Close()

func (*Conn) GetSession

func (c *Conn) GetSession() any

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

func (*Conn) SetNoDelay

func (c *Conn) SetNoDelay(nodelay bool) error

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

func (*Conn) SetSession

func (c *Conn) SetSession(session any)

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

func (*Conn) Write

func (c *Conn) Write(data []byte) (int, error)

type MultiEventLoop

type MultiEventLoop struct {
	// contains filtered or unexported fields
}

func NewMultiEventLoop

func NewMultiEventLoop(ctx context.Context, options ...func(*Options)) (e *MultiEventLoop, err error)

func (*MultiEventLoop) Free

func (e *MultiEventLoop) Free()

func (*MultiEventLoop) ListenAndServe

func (e *MultiEventLoop) ListenAndServe(addr string) error

type OnClose

type OnClose func(c *Conn, err error)

type OnData

type OnData func(c *Conn, data []byte)

type OnOpen

type OnOpen func(c *Conn, err error)

type Options

type Options struct {
	// contains filtered or unexported fields
}

边缘触发

type TaskType

type TaskType int
const (
	// 在业务协程池中执行
	TaskTypeInBusinessGoroutine TaskType = iota
	// 在event loop中执行
	TaskTypeInEventLoop
	// 一个连接独占一个协程
	TaskTypeInConnectionGoroutine
)

Directories

Path Synopsis
example
core/client command
core/server command
echo/client command
echo/server command
tlv command
tlv/client command
tlv/server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL