Skip to content

Comments

Geodat: Reduce peak memory usage#5581

Merged
RPRX merged 5 commits intoXTLS:mainfrom
Meo597:geodat-peak-mem
Jan 26, 2026
Merged

Geodat: Reduce peak memory usage#5581
RPRX merged 5 commits intoXTLS:mainfrom
Meo597:geodat-peak-mem

Conversation

@Meo597
Copy link
Collaborator

@Meo597 Meo597 commented Jan 20, 2026

  1. geodat 整个文件都被加载到内存里
    文件才几十兆根本不需要改,非要弄 mmap 帮助不大因为得看运气,改流式处理,或者直接把文件切了用 ext:
    此 pr 已改流式处理
  2. find 需要的分类,不会额外吃内存
  3. protobuf unmarshal 出 []*router.Domain 并且主动 gc 掉 1
  4. 从 []*router.Domain 创建各种 matcher
    因为二者同时存在增加峰值,此 pr 改为一边创建一边 nil

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 20, 2026

我在这几个 nil 的地方都试了下
堆内存可降低十几到几十兆不等

@RPRX
Copy link
Member

RPRX commented Jan 21, 2026

@hossinasaadi 不是说可能会影响 API 吗,你测试下,下一个版本是 v26.1.23 稳定版,不想合带 bug 的进来

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 21, 2026

可能会影响 API

我觉得是在扯淡

@j4Uq
Copy link
Contributor

j4Uq commented Jan 21, 2026

其实之前revert的pr对改善xray启动速度还是有点用的,像我的路由器上个版本启动在2-4s之间,现在这个版本是8s左右

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 21, 2026

被 revert 的代码其实等价于下面这两行
rr.Geoip = nil
rr.Domain = nil

而此 pr 清理的更及时

至于为啥会影响启动速度我就不清楚了,你测测这个 pr 呢?

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 21, 2026

@Fangliding 他那几个 pr 你用的啥方法测的峰值内存降低

我是这样观察前后变化

{
	runtime.GC()
	var m runtime.MemStats
	runtime.ReadMemStats(&m)
	fmt.Printf("heap: %d MB | sys: %d MB\n",
		m.HeapAlloc/1024/1024, m.Sys/1024/1024)
}

@Fangliding
Copy link
Member

#5480 (comment)

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 21, 2026

内存太充足了这样测不准的,因为没主动 gc 要看脸
我开个小鸡试试

@Fangliding
Copy link
Member

Fangliding commented Jan 21, 2026

搞个 GOMEMLIMIT=50MiB 这样应该够严谨点 当时忘了还有这个了

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 21, 2026

测不准,有时有效有时无效

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 21, 2026

其实之前revert的pr对改善xray启动速度还是有点用的,像我的路由器上个版本启动在2-4s之间,现在这个版本是8s左右

Configuration OK. 并不是真的启动成功,被 revert 的是把 loadgeo 放到了此消息之后,都是幻觉

@Loyalsoldier
Copy link
Contributor

可以直接按字节解析 Geodat 文件,参考我几年前写的这个:v2fly/v2ray-core#934

V2Ray 发布 v5 后,相关代码被移到了 decode.go 文件里。

@RPRX
Copy link
Member

RPRX commented Jan 22, 2026

@RPRX
Copy link
Member

RPRX commented Jan 22, 2026

其实之前revert的pr对改善xray启动速度还是有点用的,像我的路由器上个版本启动在2-4s之间,现在这个版本是8s左右

然而启动速度无所谓的,多几秒少几秒没区别

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 22, 2026

可以直接按字节解析 Geodat 文件,参考我几年前写的这个:v2fly/v2ray-core#934

@RPRX 他这个是流式读文件,不会把整个 geodat 读到 ram,可以解决 1+3 峰值

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 22, 2026

不过我感觉暂时不用上,geodat 体积不大, unmarshal 完就被 gc 掉了
不会在这一步触发阈值,除非 xray 启动过程是并行的

@LGA1150
Copy link

LGA1150 commented Jan 22, 2026

Linux mmap 文件使用的内存是 page cache,不计入堆内存,而且当系统内存不够时会优先释放(下次用到会重新读),还避免了一次拷贝。如果使用堆内存就成了 anon page,系统内存不够时只能换页到 swap。因此我还是觉得 mmap 读文件更好。

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 22, 2026

主要是为了 ios 才降启动内存峰值的
具体 ios mmap 什么情况就不清楚了

我是觉得没必要为了 1+3 做任何改动,甚至这个 pr 都意义不大
除非有明确证据:启动不起来

ios 的问题还是在于启动后常驻内存吃太多

@LGA1150
Copy link

LGA1150 commented Jan 22, 2026

我不清楚 iOS 的内核机制,但是 mmap 文件肯定对 OpenWRT 硬路由等小内存设备有帮助

@LGA1150
Copy link

LGA1150 commented Jan 22, 2026

其实 iOS 可以简单测试一下:mmap一个远超过内存大小限制的文件,并完整读取(比如计算文件的checksum)。在Linux下mmap的文件大小是可以超过物理内存大小的,系统会LRU释放未使用的 page cache。

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 22, 2026

是的要等他们反馈,有需要的话可以把 @Loyalsoldier 的那段搬过来,还可以缝合 mmap
这个 pr 做的是另外的事情

@RPRX
Copy link
Member

RPRX commented Jan 23, 2026

其实我一直觉得如果启动时的一点峰值内存都承受不住那后面跑代理也迟早炸

@hossinasaadi
Copy link
Contributor

hossinasaadi commented Jan 25, 2026

Hi, it was an internet blackout here and I missed a lot.
@Meo597 I made these changes before the mmap implementation, so the behavior is the same as before mmap. I also misunderstood rr.rule = nil as config.rule, so your changes don’t affect the API, for the record.
I checked this repo on iOS and it crashes at startup due to memory exceed.
I then tested #5488, which starts at ~25 MB RAM usage.
With the cached domain matcher #5505, it drops to ~15 MB at startup.

@RPRX
Copy link
Member

RPRX commented Jan 25, 2026

@hossinasaadi 之前的那些 PR 导致了一些问题又联系不上人所以先被 revert 掉了,我觉得不用搞太复杂的吧,有 #5505 就行

@RPRX
Copy link
Member

RPRX commented Jan 25, 2026

就是说降低启动峰值内存这方面不需要太复杂的深度修改,有这个 PR 的小修改就行,而 iOS 上应该用 #5505

@Meo597 @hossinasaadi 你们觉得呢

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 25, 2026

ios 真正的问题是运行时占用而非峰值
#5505 改动太大没法继续改进运行时占用了

有人说 mmap 用的是 swap
那么仅 mmap 是否能解决 ios 问题?

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 25, 2026

我们应该首先确定 ios 到底在哪一步开始崩的

1+3 还是 3+4

@Meo597
Copy link
Collaborator Author

Meo597 commented Jan 26, 2026

@hossinasaadi 这是什么 app
它有自己的特殊行为,休眠/唤醒是重启了 core 但没有去读取配置文件

此 pr 在构建 matcher 的时候会清理内存中反序列化出来的配置条目

@RPRX

This comment was marked as off-topic.

@enriquephl

This comment was marked as spam.

@RPRX

This comment was marked as off-topic.

@hossinasaadi
Copy link
Contributor

@Meo597 a custom app, it will read the config file again and seems crash while reading cn geodat at

MphDomainMatcher is enabled for 5 domain rule(s)
create MphDomainMatcher 114145 domain and crash here...

Im check it further.

@hossinasaadi
Copy link
Contributor

@RPRX @Meo597

Tested on iOS 26

Checked on iOS, now starts with 23-25MB (seems normal)

on iOS 15.8 will crash at startup, but with mmap is ok. i think for ios we should use mmap! 🤔

@RPRX
Copy link
Member

RPRX commented Jan 26, 2026

@enriquephl 我每次看到你发的那些话就气不打一处来,问题很大:

  1. “我就是想在试之前先确认下 VLESS 在这方面是不是修复了”,你直接换直接试不就行了吗,为什么要占用开发者的时间??
  2. 给你说了让你换、旧的 reverse 要被删掉,你却说你要等删了再换、再吐槽一下问题,你到底是想解决问题还是想气人??

1 是简单 QA,2 是几个意思?你甚至还开 issue 问过,issue 是干这事的吗?你有这 spam 的精力为什么不自己换一下试一下呢??

@RPRX
Copy link
Member

RPRX commented Jan 26, 2026

聊几句就能让开发者血压上头你也是人才,浪费我时间就算了问完了还就是不改配置还非要再吐槽两句,你到底是来干嘛的??

多次更正消息是为了挂频道,我真就没见过这种人,这给我气得

@KiGamji
Copy link
Contributor

KiGamji commented Jan 26, 2026

@enriquephl just stick with battle-tested mainline protocols and you'll be fine

@paqx
Copy link
Contributor

paqx commented Jan 27, 2026

@RPRX @KiGamji

I don't know where else to put it. Since you discussed the new XDRIVE protocol here, I can share it here.

I originally wrote about Yandex Disk (#5414 (comment)) which is basically like Google Drive. But then I did some research and realized that it might be even better to use S3 compatible stores as proxies. Because Yandex Disk, Google Drive and other cloud drives probably have different APIs and adding a single unified transport for all of them is going to be more difficult than using S3 stores that should all be compatible with each other.

With the help of AI, I wrote a simple proof of concept. It's a server and a client. They use a locally running instance of Minio as a proxy. The server is running and polling the S3 store at a fixed interval. When a new chunk of data appears, it fetches it. When all chunks are fetched, the file is re-assembled.

Link to the code:
https://github.com/paqx/s3proxytest

I only tested it locally so far. I will try to use a real S3 store (e.g. Yandex and VK Cloud both have them) later. The size of the binaries is big, though. I guess instead of using the go S3 library, I should use plain REST API requests instead.

What do you think?

@RPRX
Copy link
Member

RPRX commented Jan 27, 2026

@paqx 太好了,你怎么知道我还没开始写代码,PR welcome

我也考虑过利用云存储服务,不过网盘的优势在于一它们是免费的,二它们是民用的、使用更广泛,可以用它们的域名来隐藏代理

对于 S3 stores,你可以列举出它们的价格,以及我们能用哪些域名隐藏代理吗?如果是俄罗斯境内的但境外也能访问也行

当然 XDRIVE 的目标不止是兼容网盘,而是利用任何可能的东西,所以可以兼容 S3 stores 作为选项之一,最好不要 library

“不需要自有公网 IP”是它与 XHTTP 在设计模式上的区别,XDRIVE 的 hub 并不监听端口,比如可以放在境外家宽或免费容器服务内


我们发现 Google Drive 做不到“边传边用”,且创建过多小文件会被封号,所以我们在尝试 revisions

既然 XDRIVE 的计划已经公布不如以后就公开讨论进度吧,集思广益,以下是 @iambabyninja 今天发给我的邮件:

Hi.
Based on today’s results, I have a strong impression that Google Drive is ().

The key issues I ran into:
 1. Lack of read-while-write
In Google Drive, there is effectively only one file update model:
files.update
→ full upload
→ commit
→ the revision becomes visible

It is impossible to read a file before the upload is completed. Neither resumable upload nor partial download allows access to data before the revision is committed.
 2. Eventual consistency in the Drive API

After an upload completes, the file or the new revision does not appear in listings immediately. The delay can be several seconds, which completely breaks any attempt to build a deterministic protocol on top of revisions.
 3. Limitations of appProperties on revisions

I planned to use appProperties to store service metadata, for example a seq value, but discovered the following:
 • appProperties do not exist on the Revision object
 • these properties apply only to the current state of the file
 • when a new revision is created, previous values are overwritten

As a result, we lose the ability to know what seq revision 1 had if the current revision is already 5. This is critical for the protocol.
 4. Limits on the number of files

Creating a separate file for each chunk of data is not feasible. API limits are hit very quickly, so this option is eliminated immediately.
 5. Data races when overwriting files

If we use a limited number of files and simply overwrite their contents, data races become inevitable. Drive does not provide sufficient synchronization primitives to handle this correctly.

In the end, I arrived at a somewhat workable approach. Attached is a Go file with a minimal implementation. Credentials and parameters are currently hardcoded as constants.

The code quality is far from ideal. Go is not my primary language, and I have very little experience with it. Still, I hope the logic is readable and understandable.

nebulabox added a commit to nebulabox/Xray-core that referenced this pull request Jan 28, 2026
* commit 'f6a7e939231e5ec6b167628bf730dc70a3c36707': (90 commits)
  VMess inbound: Optimize replay filter (XTLS#5562)
  Bump github.com/pires/go-proxyproto from 0.9.1 to 0.9.2 (XTLS#5614)
  TUN inbound: Add iOS support (XTLS#5612)
  Geodat: Reduce peak memory usage (XTLS#5581)
  Bump github.com/pires/go-proxyproto from 0.9.0 to 0.9.1 (XTLS#5608)
  Hysteria transport: Support range & random for `interval` in `udphop` as well (XTLS#5603)
  TUN inbound: Enhance Darwin interface support (XTLS#5598)
  XUDP client: Initialize Global ID's BaseKey correctly (XTLS#5602)
  TUN inbound: Disable RACK/TLP recovery to fix connection stalls (XTLS#5600)
  v26.1.23
  common/errors/feature_errors.go: Add PrintNonRemovalDeprecatedFeatureWarning() (XTLS#5567)
  API: Add ListRule() for routing (XTLS#5569)
  Log config: More flexible `maskAddress` (XTLS#5570)
  Bump github.com/miekg/dns from 1.1.70 to 1.1.72 (XTLS#5590)
  Bump github.com/cloudflare/circl from 1.6.2 to 1.6.3 (XTLS#5589)
  Hysteria transport: Fix speedtest issue (XTLS#5587)
  README.md: Add fancyss to Asuswrt-Merlin Clients
  Router: Fix panic in ProcessNameMatcher when source IPs are empty (XTLS#5574)
  README.md: Update links for PassWall & PassWall 2 (XTLS#5572)
  Tests: Reduce RAM usage (XTLS#5577)
  ...

# Conflicts:
#	core/core.go
@Kc2353
Copy link

Kc2353 commented Jan 28, 2026

有一种设想,但是我想知道他目前困难在哪里,这种设想是针对于高延迟,或者说低iops但是带宽不(太)低的方法
大致思路是把每个单独的包做大,传输间隔做成秒级别的,把1~2秒内的数据全部打包发送,或者更激进一点,“半阻塞”所有连接,并选择手动控制发包时机,这个目前受制于哪一个底层限制?

@RPRX
Copy link
Member

RPRX commented Jan 28, 2026

网盘的 list 可能有数秒的延迟,实际上这也是可以接受的,只是握手需要几秒而已,对于高速下载/上传的场景并没有什么影响

即使是 revisions,状态机制都不需要,只要做到按序上传、按序读取即可,也不用管对方有没有读取,不过得有 30 秒不活跃超时

@paqx
Copy link
Contributor

paqx commented Jan 28, 2026

Very good,How do you know I haven't started writing code yet?,PR welcome

Well, I didn't know, but you mentioned Google Drive, so I thought it wouldn't use S3.

For S3 stores, can you list their prices, and which domains we can use to hide proxies?It's acceptable if it's within Russia but accessible from abroad.

Here's a link to Yandex Cloud:
https://yandex.cloud/en/services/storage
https://yandex.cloud/en/docs/storage/pricing

VK Cloud:
https://cloud.vk.com/docs/en/storage/s3
https://cloud.vk.com/docs/en/storage/s3/tariffication

Edge Center (they are too greedy, I would't consider this option unless it's really necessary):
https://edgecenter.ru/storage
https://edgecenter.ru/storage/price

Check if any of them are accessible for you. There are probably others, but I listed these S3 stores because they are probably white-listed in Russia and will allow users to bypass mobile internet restrictions.

The general idea is to make each individual packet larger, with transmission intervals on the order of seconds, and to package and send all the data within 1-2 seconds. Or, to be more aggressive, "semi-block" all connections and choose to manually control the timing of packet transmission. What underlying limitation is this currently subject to?

It seems to me it can be configurable:

  1. A client and server use smaller chunks and poll the storage more frequently (e.g. every 100-200 ms) - to get low latency
  2. A client and server use bigger chunks and poll the storage less frequently (e.g. every 500-1000 ms) - to reduce the number of requests and not hit the rate limits of the storage.

@RPRX
Copy link
Member

RPRX commented Jan 28, 2026

@paqx 测试一下它们的 API 地址能不能在境内外访问就行

最新的测试发现 Google Drive 对“有问题的账号”才会“要求验证”,新号貌似没这个问题,不过发包间隔当然是可以配置的

其实对于普通上网和上传/下载/看视频来说 500ms 就够用了,也没人指望用这个打游戏吧

@RPRX
Copy link
Member

RPRX commented Jan 28, 2026

以下是上面提到的测试代码(敏感信息已删),@paqx 你现在比较有空可以写个 XDRIVE 传输层 PR 过来,thanks

package main

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"strconv"
	"sync"
	"time"

	"golang.org/x/oauth2"
	"google.golang.org/api/drive/v3"
	"google.golang.org/api/option"
)

const (
	ClientID     = ""
	ClientSecret = ""
	RefreshToken = ""
	FolderID     = ""

	TestDataSize    = 5 * 1024 * 1024
	SegmentSize     = 512 * 1024
	RetentionWindow = 30
	PollInterval    = 500 * time.Millisecond
)

func main() {
	log.SetFlags(log.Ltime | log.Lmicroseconds)

	srv := driveService()

	log.Println("SYS: INIT WRITER")
	writer := NewLogWriter(srv)

	ctx, cancel := context.WithCancel(context.Background())
	defer func() {
		log.Println("SYS: CLEANUP")
		srv.Files.Delete(writer.manifestID).Do()
		for _, fid := range writer.activeSegments {
			srv.Files.Delete(fid).Do()
		}
	}()

	log.Println("SYS: INIT READER")
	reader := &LogReader{
		srv:         srv,
		manifestID:  writer.manifestID,
		expectedSeq: 0,
	}
	dataChannel := make(chan []byte, 100)
	go reader.Start(ctx, dataChannel)

	log.Println("SYS: GEN TEST DATA")
	testData := make([]byte, TestDataSize)
	for i := range testData {
		testData[i] = byte(i % 255)
	}

	go func() {
		log.Println("SYS: START STREAM")
		chunkSize := 64 * 1024
		for i := 0; i < len(testData); i += chunkSize {
			end := i + chunkSize
			if end > len(testData) {
				end = len(testData)
			}
			writer.Write(testData[i:end])
			time.Sleep(10 * time.Millisecond)
		}
		writer.Close()
		log.Println("SYS: WRITER FINISHED")
	}()

	var receivedData []byte

	// Client Loop
	for len(receivedData) < len(testData) {
		chunk := <-dataChannel
		receivedData = append(receivedData, chunk...)
		log.Printf("Client: Received total %d / %d bytes", len(receivedData), len(testData))
	}

	cancel()
	time.Sleep(1 * time.Second)

	if bytes.Equal(testData, receivedData) {
		log.Println("SYS: DATA VERIF. ALL OK.")
	} else {
		log.Println("SYS: OH SHIT...")
	}
}

func driveService() *drive.Service {
	ctx := context.Background()
	cfg := &oauth2.Config{
		ClientID:     ClientID,
		ClientSecret: ClientSecret,
		Endpoint:     oauth2.Endpoint{TokenURL: "https://oauth2.googleapis.com/token"},
	}
	token := &oauth2.Token{
		RefreshToken: RefreshToken,
		Expiry:       time.Now().Add(-time.Hour),
		TokenType:    "Bearer",
	}

	httpClient := &http.Client{Timeout: 60 * time.Second}
	ctx = context.WithValue(ctx, oauth2.HTTPClient, httpClient)

	srv, err := drive.NewService(ctx, option.WithTokenSource(cfg.TokenSource(ctx, token)))
	if err != nil {
		log.Fatal("Failed to create Drive service: ", err)
	}
	return srv
}

func retryForever(name string, operation func() error) {
	backoff := 500 * time.Millisecond
	maxBackoff := 10 * time.Second
	attempt := 1

	for {
		err := operation()
		if err == nil {
			return
		}
		log.Printf("SYS: Retry %s failed (attempt %d): %v. Retrying in %v...", name, attempt, err, backoff)
		time.Sleep(backoff)
		backoff *= 2
		if backoff > maxBackoff {
			backoff = maxBackoff
		}
		attempt++
	}
}

type LogWriter struct {
	srv            *drive.Service
	manifestID     string
	mu             sync.Mutex
	buf            []byte
	seq            int
	activeSegments map[int]string
}

func NewLogWriter(srv *drive.Service) *LogWriter {
	var manifestID string
	retryForever("Init Manifest", func() error {
		m, err := srv.Files.Create(&drive.File{
			Name:          "xdrive_wal_manifest.json",
			Parents:       []string{FolderID},
			AppProperties: map[string]string{"type": "wal"},
		}).Fields("id").Do()
		if err == nil {
			manifestID = m.Id
		}
		return err
	})

	return &LogWriter{
		srv:            srv,
		manifestID:     manifestID,
		activeSegments: make(map[int]string),
	}
}

func (w *LogWriter) Write(p []byte) {
	w.mu.Lock()
	defer w.mu.Unlock()

	w.buf = append(w.buf, p...)
	if len(w.buf) >= SegmentSize {
		w.flush()
	}
}

func (w *LogWriter) Close() {
	w.mu.Lock()
	defer w.mu.Unlock()
	if len(w.buf) > 0 {
		w.flush()
	}
}

func (w *LogWriter) flush() {
	data := w.buf
	currentSeq := w.seq
	fileName := fmt.Sprintf("seg_%09d.bin", currentSeq)
	var fileID string

	retryForever(fmt.Sprintf("Upload Seg %d", currentSeq), func() error {
		f, err := w.srv.Files.Create(&drive.File{
			Name:    fileName,
			Parents: []string{FolderID},
		}).Media(bytes.NewReader(data)).Fields("id").Do()
		if err == nil {
			fileID = f.Id
		}
		return err
	})

	w.activeSegments[currentSeq] = fileID

	retryForever("Update Manifest", func() error {
		props := make(map[string]string)
		props[fmt.Sprintf("s%d", currentSeq)] = fileID

		oldSeq := currentSeq - RetentionWindow
		if _, ok := w.activeSegments[oldSeq]; ok {
			props[fmt.Sprintf("s%d", oldSeq)] = ""
		}

		_, err := w.srv.Files.Update(w.manifestID, &drive.File{
			AppProperties: props,
		}).Fields("id").Do()
		return err
	})

	log.Printf("WRITER: COMMITTED SEQ=%d", currentSeq)

	w.seq++
	w.buf = nil

	oldSeq := currentSeq - RetentionWindow
	if oldID, ok := w.activeSegments[oldSeq]; ok {
		delete(w.activeSegments, oldSeq)
		go func(fid string) {
			w.srv.Files.Delete(fid).Do()
		}(oldID)
	}
}

type LogReader struct {
	srv         *drive.Service
	manifestID  string
	expectedSeq int
}

func (r *LogReader) Start(ctx context.Context, ch chan<- []byte) {
	ticker := time.NewTicker(PollInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			m, err := r.srv.Files.Get(r.manifestID).Fields("appProperties").Do()
			if err != nil || m.AppProperties == nil {
				continue
			}

			availableSeqs := make(map[int]string)
			minSeq := -1
			for k, v := range m.AppProperties {
				if len(k) > 1 && k[0] == 's' {
					if s, err := strconv.Atoi(k[1:]); err == nil {
						availableSeqs[s] = v
						if minSeq == -1 || s < minSeq {
							minSeq = s
						}
					}
				}
			}

			if len(availableSeqs) == 0 {
				continue
			}

			if r.expectedSeq < minSeq {
				log.Printf("READER: LAG! Expected SEQ=%d, Min Available=%d. Jumping...", r.expectedSeq, minSeq)
				r.expectedSeq = minSeq
			}

			fileID, exists := availableSeqs[r.expectedSeq]
			if !exists {
				continue
			}

			var data []byte
			retryForever(fmt.Sprintf("Download Seg %d", r.expectedSeq), func() error {
				resp, err := r.srv.Files.Get(fileID).Download()
				if err != nil {
					return err
				}
				defer resp.Body.Close()
				data, err = io.ReadAll(resp.Body)
				return err
			})

			log.Printf("READER: Downloaded SEQ=%d (%d bytes)", r.expectedSeq, len(data))

			ch <- data
			r.expectedSeq++
		}
	}
}

@RPRX
Copy link
Member

RPRX commented Jan 28, 2026

另外 revisions 只有 Google Drive 和 OneDrive 支持,对于 yandex disk 和 S3 stores 需要创建多个文件,就是说至少需要两种模式

它们均不支持“上传未完成就开始下载”,无法实现流式,如果以后找到能实现的可以有第三种模式,比如可以给 XVIDEO 用

当然还有第四种模式,比如“改个用户名”这种,只能拿两个点位等对方读完了再传新的,这是以后的事情

@RPRX
Copy link
Member

RPRX commented Jan 28, 2026

@Kc2353
Copy link

Kc2353 commented Jan 29, 2026

刚刚敲打了一下千问,他告诉我阿里的OSS有一种给日志类有持续内容更新,但是文件大小不大的文件使用的上传下载方式,称为,“追加上传-部分下载”,简而言之是允许只上传文件的新增部分,允许指定下载开始的位置以及结束的位置。
想知道那边有没有这种不是个人通常使用的(百度网盘,之类的)民用的傻瓜式网盘,而是企业也可以用的这种专业云储存。
或者也可以看看那些的API有没有这种追加式上传以及部分下载的的功能,已经敲打了Gemini,Google Drive API没有这个功能
另外阿里这个“部分上传下载”好像不能与版本控制同时开启。
刚才敲打了一下其他AI,好像这个功能仅此一家,(Azure好像也行),那可能不是很建议优先做,大量小文件模拟持续上传也不是不行

有持续的数据流,但是数据量不是非常大,这种如何做到边上传边下载?另外接收方是否能只下载文件新增的那一部分 太棒了!你描述的场景非常典型:持续数据流 + 边上传边下载 + 接收方只拉新增部分。这其实是很多实时日志、IoT 数据上报、监控系统、甚至 AI 训练数据同步的核心需求。

好消息是:阿里云 OSS 完全可以支持这种模式,而且有几种优雅的实现方式。我来给你拆解最合适的方案。

✅ 最佳推荐:用 Append Object(追加上传) + Range 下载

🎯 为什么适合你?

  • 数据是持续追加的(不是随机写)
  • 数据量“不是非常大”(Append Object 单文件上限 5GB,够用)
  • 接收方想只下载新增部分 → 可以通过 Range 请求实现!

🔧 实现原理

  1. 上传端:使用 Append Upload
    每次有新数据到来,就调用 AppendObject 追加到同一个文件末尾。

Python 示例(aliyun-oss-python-sdk)
from oss2 import Auth, Bucket, BUCKET_APPEND

auth = Auth('', '')
bucket = Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'your-bucket')

position = 0 # 初始位置为0
log_data = "new log linen"

第一次 append
result = bucket.append_object('stream.log', position, log_data)
position = result.next_position # 记录下次写入位置

后续继续 append
new_data = "another linen"
bucket.append_object('stream.log', position, new_data)

✅ 每次 append 后,文件立即可读,且内容是累积的。

  1. 下载端:记录上次读取位置,用 Range 只拉新增

接收方维护一个 已读字节偏移量(offset),比如上次读到了第 1024 字节。

下次请求时,用 HTTP Range: bytes=1024- 告诉 OSS:“从 1024 开始给我后面的所有数据”。

下载新增部分
offset = 1024 # 上次读到的位置
headers = {'Range': f'bytes={offset}-'}
result = bucket.get_object('stream.log', headers=headers)

new_content = result.read()
print(new_content.decode('utf-8'))
更新 offset += len(new_content)

✅ 这样就实现了 增量拉取,不重复下载已有数据!

📌 关键优势
特性 是否支持
边写边读 ✅ 写完立即可读
只下载新增 ✅ 通过 Range 实现
顺序一致 ✅ Append 保证顺序
低延迟 ✅ 无合并等待

⚠️ 注意事项

  1. Append Object 限制:

    • 单个文件最大 5GB
    • 不能修改/删除已写入的内容
    • 不支持并发 append(必须串行,按 position 顺序写)
  2. 如何应对超过 5GB?

    • 轮转文件:stream_20260129_001.log, stream_20260129_002.log...
    • 接收方监听文件列表变化(可用 OSS 事件通知)
  3. 性能建议:

    • 不要每条日志都 append(网络开销大),建议批量缓冲后追加(如每秒一批)
    • 接收方可以加一点 polling 间隔(如每 5 秒拉一次),避免频繁请求

🔁 替代方案(如果 Append 不适用)

如果你的数据不是严格追加(比如有随机写、或需要并发写),可以考虑:

方案B:分片上传 + 元数据记录已传范围

  • 每次上传一个“数据块”作为独立 Object(如 chunk_001, chunk_002)
  • 用一个 manifest 文件记录已上传的 chunk 列表
  • 接收方对比 manifest,只下载新 chunk

但复杂度更高,适合大文件分段场景,不太适合你的“小数据流”。

💡 给你的实操建议

假设你在湖南做 IoT 设备日志收集:

设备 → (Append to OSS: device_123.log) →
后台服务每隔10秒 Range 下载新增日志 → 实时分析/告警

你只需要:

  1. 上传端维护 position
  2. 下载端维护 last_offset
  3. 用 OSS 的 Append + Range 能力

如果你愿意,我可以帮你写一段完整的 上传+下载 demo 代码(Python / Node.js / C# 都行),或者帮你设计轮转策略 😊

你现在是在做 IoT、日志系统,还是别的什么项目呀?

@RememberOurPromise
Copy link

https://developers.cloudflare.com/api/

api.cloudflare.com

伊朗俄罗斯可用吗?

希望我们永远用不上

@MoRanYue
Copy link

(希望S3存储桶能够得到XDrive传输方式首先支持,便可使用Cloudflare R2呀)

@RPRX
Copy link
Member

RPRX commented Jan 31, 2026

我知道 cf 有这个好像还有免费额度但我没提,你们咋逮着赛博大善人 cf 不放了

百度网盘不行,貌似不能从境外访问,其它国内的网盘可能可以,不过 API 支持可能是一坨

阿里云等国内云应该可以从境外访问,不过可能会封你号,我觉得国内的服务还是少碰吧,还远到不了这一步

@RPRX
Copy link
Member

RPRX commented Jan 31, 2026

@paqx XDRIVE 有开工吗

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.