-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-36525] Support for AI Model Integration for Data Processing #3642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ataFieldSerializer`, `DataTypeSerializer`, and `RowTypeSerializer` classes that causes NPE for certain RowTypes like nested row type
…ySegment for nested arrays
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @proletarians, thanks for working on this huge PR! However I'm a little confused by some of these codes.
Seems you have implemented a new interface (with ModelDef) allowing users to configure model parameters without creating a new model. But I also see some functions in SystemFunctionUtils that have API keys and endpoints hard-encoded here. Apart some obviously unwanted test classes, I also see some weird things in flink-cdc-openai-udf package, where UDF functions in there seems not configurable, and temporary API endpoints and keys are hard-encoded inside. How are users supposed to use them?
Could you please kindly share your design concepts about this feature?
Also, I noticed that there's no any test cases (except those written by umeshdangat). Though it's not easy to test real AI models in CI, I think some kind of mocking test is required at least.
If I understand correctly, we should have some "pre-defined" AI model handling functions that converts input to vectors or strings, and users could pass their own API keys and endpoints into them to invoke different services. Considering there's no test coverage now, why these hard-encoded API keys and endpoint URLs exist?
As for code-style, please run mvn spotless:check to ensure all rules have been enforced before committing. If these requirements are not met, CI will refuse to compile your code. Some changes are required including:
- Comments should not be written in Chinese.
- License headers and JavaDocs have standard formats. Do not change or alter them. For newly created classes, make sure they have been properly added.
- Wildcard import
import java.*and omitting if-block braces are not allowed.
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
License headers, JavaDocs and comments should be kept.
| import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions; | ||
| import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; | ||
|
|
||
| /** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
| private static final String ROUTE_KEY = "route"; | ||
| private static final String TRANSFORM_KEY = "transform"; | ||
| private static final String PIPELINE_KEY = "pipeline"; | ||
| private static final String MODEL_KEY = "models"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
route, transform and user-defined-function are not in plural form. Maybe use model for consistency?
| pipelineConfigNode | ||
| .fields() | ||
| .forEachRemaining( | ||
| entry -> { | ||
| String key = entry.getKey(); | ||
| JsonNode value = entry.getValue(); | ||
| if (!key.equals(MODEL_KEY)) { | ||
| pipelineConfigMap.put(key, value.asText()); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Models configuration could be parsed and removed in YamlPipelineDefinitionParser#parse method in advance, so there's no need to iterate configuration maps again here.
|
|
||
| // Pipeline configs are optional | ||
| List<ModelDef> modelDefs = new ArrayList<>(); | ||
| JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY); | |
| JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).remove(MODEL_KEY); |
Accessing and removing MODEL_KEY could be done here at once.
| } | ||
|
|
||
| try { | ||
| // 确保 OpenAiEmbeddingModel 已初始化 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove Chinese comments.
| try { | ||
| // 确保 OpenAiEmbeddingModel 已初始化 | ||
| if (embeddingModel == null) { | ||
| initializeOpenAiEmbeddingModel(apiKey, "https://api.openai.com/v1/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the endpoint hard-encoded here? Why we need this function and passing apiKeys manually? Shouldn't these be configured in models: rule block?
| public class ModelUdf extends ScalarFunction { | ||
| private final ModelDef model; | ||
|
|
||
| public ModelUdf(ModelDef model) { | ||
| this.model = model; | ||
| } | ||
|
|
||
| // UDF 的主要方法,处理输入并返回结果 | ||
| public String eval(String input) { | ||
| // 这里实现调用模型 API 的逻辑 | ||
| // 使用 model.getHost() 和 model.getApiKey() 来访问 API | ||
| // 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写 | ||
| return "Embedding for: " + input; | ||
| } | ||
|
|
||
| @Override | ||
| public void open(FunctionContext context) throws Exception { | ||
| // 初始化逻辑,如建立API连接等 | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws Exception { | ||
| // 清理逻辑,如关闭API连接等 | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this class for? Seems it's completely useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put this in documentation if this was meant to be an implementation reference.
| <module>flink-cdc-runtime</module> | ||
| <module>flink-cdc-e2e-tests</module> | ||
| <module>flink-cdc-pipeline-udf-examples</module> | ||
| <module>flink-cdc-openai-udf</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe putting it as a part of SystemFunctionUtils is enough? Creating a new top-level package seems too much.
| new SqlFunction( | ||
| "GET_EMBEDDING", | ||
| SqlKind.OTHER_FUNCTION, | ||
| ReturnTypes.explicit(SqlTypeName.VARCHAR), // 返回类型是数组 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SqlTypeName.VARCHAR), // 返回类型是数组
This comment seems a little untrue. Also, after we have user-defined-models, are these definitions still necessary?
| private static final Logger LOG = LoggerFactory.getLogger(ModelUdf.class); | ||
| private static final String DEFAULT_API_KEY = | ||
| "sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b"; | ||
| private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002"; | ||
| private static final int DEFAULT_TIMEOUT_SECONDS = 30; | ||
| private static final String DEFAULT_BASE_URL = "https://api.gpt.ge/v1/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Providing these "default" API keys and endpoints aren't meaningful. Request tokens might ran out, endpoints might expire.
| private List<UdfDef> convertModelsToUdfs(List<ModelDef> models) { | ||
| return models.stream().map(this::modelToUdf).collect(Collectors.toList()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conversion is quite worrying, since obviously ModelDef carries more information (the params), and the conversion could not be done without some hacky approaches like pasting params into the name field.
Modifying the addUdfFunctions function, making it accepting a more universal data structure (instead of Name - Classpath Tuple2) makes more sense to me.
|
Provided by #3753 . Close this PR. |
The goal is to extend flink-cdc with the capability to invoke AI models during the data stream processing workflow, with a particular focus on supporting array data structures. This feature will allow users to easily integrate embedding models to handle array-based data, such as lists of text, numeric data, or other multi-dimensional arrays, within their Flink jobs.