读取csv大文件数据

AI摘要
该内容为一段PHP代码,属于【知识分享】。代码定义了一个用于读取CSV文件、处理数据并存入MongoDB数据库的PHP命令行方法,包含文件路径解析、列筛选、数据清洗(如JSON解析、字段校验)和异常数据分类存储等逻辑。
/**
     * 读取本地CSV文件,取指定列为有效数据
     * 用法: php cli.php run/tasks/once readCsvFile 文件路径 列索引1,列索引2,...
     * 示例: php cli.php run/tasks/once readCsvFile /data/file/data.csv 0,1,3
     * @param array $params [0]=文件路径(相对BASE_PATH或绝对路径), [1]=有效列索引逗号分隔(如0,1,2), 可选[2]=1跳过首行表头
     * @return array 有效数据列表,每行为关联数组
     */
    public function readCsvFileAction($params = [])
    {
        $Mongodb = DiUtils::getDi()->get('Mongodb');
        if (empty($params[0])) {
            DiUtils::getLog()->error('readCsvFile: 请提供CSV文件路径');
            echo "用法: php cli.php run/tasks/once readCsvFile 文件路径 列索引1,列索引2,... [是否跳过首行:1]\n";
            return [];
        }
        // $filePath = $params[0];
        $name=$params[0];
        $filePath = BASE_PATH.'/data/file/csv/'.$name.'.csv';
        if (substr($filePath, 0, 1) !== '/' && !preg_match('#^[A-Za-z]:[\\\\/]#', $filePath)) {
            $filePath = BASE_PATH . '/' . ltrim($filePath, '/');
        }
        if (!is_readable($filePath)) {
            DiUtils::getLog()->error('readCsvFile: 文件不存在或不可读', ['path' => $filePath]);
            echo "文件不存在或不可读: {$filePath}\n";
            return [];
        }
        $columnKeys = [];
        if (!empty($params[1])) {
            $columnKeys = array_map('intval', array_filter(explode(',', str_replace(' ', '', $params[1]))));
        }
        $skipHeader = !empty($params[2]) && (int)$params[2] === 1;
        $handle = fopen($filePath, 'r');
        if (!$handle) {
            DiUtils::getLog()->error('readCsvFile: 无法打开文件', ['path' => $filePath]);
            return [];
        }
        $validData = [];
        $lineNum = 0;
        $headerRow = null;
        $limitNum=20000;
        $i = 0;
        while (($row = fgetcsv($handle)) !== false) {
            $lineNum++;
            if ($skipHeader && $lineNum === 1) {
                $headerRow = $row;
                continue;
            }
            if (empty($columnKeys)) {
                $validData[] = $row;
                continue;
            }
            $item = [];
            foreach ($columnKeys as $idx) {
                $item[$idx] = $row[$idx] ?? '';
            }
            $data= $item[10];
            $data = json_decode($data,true);
            if (isset($data['Values']) && is_array($data['Values'])) {
                foreach ($data['Values'] as $key => $value) {
                    if (is_array($value) && count($value) === 1) {
                        $data['Values'][$key] = $value[0];
                    }
                }
            }
            $data = $data['Values'];
            if(empty($data)){
                continue;
            }
            if(!empty($data['clue_token'])){
                $Mongodb->insert($data,'changdu_all_users_ygh');
                continue;
            }
            if(empty($data['project_id'])|| $data['project_id'] == '__PROJECT_ID__'){
                //DiUtils::getLog()->info('project_id为空', $data);
                $data['error'] = 'project_id为空';
                //$Mongodb->insert($data,'changdu_all_users_ygh_error');
                continue;
            }
            if(!empty($data['customize_params'])){
                $data['error'] = '非巨量用户,customize_params有值';
                $Mongodb->insert($data,'changdu_all_users_ygh_error_cp');
                continue;
            }
            if(empty($data['click_id'])||$data['click_id'] == '__CLICKID__'){
                $data['error'] = 'click_id为空';
                $Mongodb->insert($data,'changdu_all_users_ygh_error_ci_null');
                //DiUtils::getLog()->info('adq用户', $data);
                continue;
            }
            if(strlen($data['click_id'])<50){
                $data['error'] = 'click_id长度大于50';
                $Mongodb->insert($data,'changdu_all_users_ygh_error_ci_lengh');
                //DiUtils::getLog()->info('adq用户', $data);
                continue;
            }
            $Mongodb->insert($data,'changdu_all_users');
        }
        fclose($handle);
        echo $name."读取完成".count($validData)."\n";

//        DiUtils::getLog()->info('readCsvFile: 读取完成', [
//            'path' => $filePath,
//            'total_lines' => count($validData),
//            'columns' => $columnKeys,
//        ]);
//        echo "读取完成,共 " . count($validData) . " 条有效数据\n";

    }
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 3

刚好看到一个导入的包:zappzerapp/laravel-ingest,亮点:无论文件大小如何都能保持内存使用一致,文档地址:https://laravel-news.com/laravel-ingest,注意它要求PHP版本8.3以上

1周前 评论
Dcatplus-杨光 3天前
Dcatplus-杨光

你指的大文件csv,有多大? 5万行起吗?

3天前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!