Skip to content

Conversation

@proletarians
Copy link
Contributor

The goal is to extend flink-cdc with the capability to invoke AI models during the data stream processing workflow, with a particular focus on supporting array data structures. This feature will allow users to easily integrate embedding models to handle array-based data, such as lists of text, numeric data, or other multi-dimensional arrays, within their Flink jobs.

Copy link
Member

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @proletarians, thanks for working on this huge PR! However I'm a little confused by some of these codes.

Seems you have implemented a new interface (with ModelDef) allowing users to configure model parameters without creating a new model. But I also see some functions in SystemFunctionUtils that have API keys and endpoints hard-encoded here. Apart some obviously unwanted test classes, I also see some weird things in flink-cdc-openai-udf package, where UDF functions in there seems not configurable, and temporary API endpoints and keys are hard-encoded inside. How are users supposed to use them?

Could you please kindly share your design concepts about this feature?

Also, I noticed that there's no any test cases (except those written by umeshdangat). Though it's not easy to test real AI models in CI, I think some kind of mocking test is required at least.

If I understand correctly, we should have some "pre-defined" AI model handling functions that converts input to vectors or strings, and users could pass their own API keys and endpoints into them to invoke different services. Considering there's no test coverage now, why these hard-encoded API keys and endpoint URLs exist?

As for code-style, please run mvn spotless:check to ensure all rules have been enforced before committing. If these requirements are not met, CI will refuse to compile your code. Some changes are required including:

  • Comments should not be written in Chinese.
  • License headers and JavaDocs have standard formats. Do not change or alter them. For newly created classes, make sure they have been properly added.
  • Wildcard import import java.* and omitting if-block braces are not allowed.

* See the License for the specific language governing permissions and
* limitations under the License.
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

License headers, JavaDocs and comments should be kept.

import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
private static final String PIPELINE_KEY = "pipeline";
private static final String MODEL_KEY = "models";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

route, transform and user-defined-function are not in plural form. Maybe use model for consistency?

Comment on lines +295 to +304
pipelineConfigNode
.fields()
.forEachRemaining(
entry -> {
String key = entry.getKey();
JsonNode value = entry.getValue();
if (!key.equals(MODEL_KEY)) {
pipelineConfigMap.put(key, value.asText());
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Models configuration could be parsed and removed in YamlPipelineDefinitionParser#parse method in advance, so there's no need to iterate configuration maps again here.


// Pipeline configs are optional
List<ModelDef> modelDefs = new ArrayList<>();
JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).get(MODEL_KEY);
JsonNode modelsNode = pipelineDefJsonNode.get(PIPELINE_KEY).remove(MODEL_KEY);

Accessing and removing MODEL_KEY could be done here at once.

}

try {
// 确保 OpenAiEmbeddingModel 已初始化
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove Chinese comments.

try {
// 确保 OpenAiEmbeddingModel 已初始化
if (embeddingModel == null) {
initializeOpenAiEmbeddingModel(apiKey, "https://api.openai.com/v1/");
Copy link
Member

@yuxiqian yuxiqian Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the endpoint hard-encoded here? Why we need this function and passing apiKeys manually? Shouldn't these be configured in models: rule block?

Comment on lines +37 to +61
public class ModelUdf extends ScalarFunction {
private final ModelDef model;

public ModelUdf(ModelDef model) {
this.model = model;
}

// UDF 的主要方法,处理输入并返回结果
public String eval(String input) {
// 这里实现调用模型 API 的逻辑
// 使用 model.getHost() 和 model.getApiKey() 来访问 API
// 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写
return "Embedding for: " + input;
}

@Override
public void open(FunctionContext context) throws Exception {
// 初始化逻辑,如建立API连接等
}

@Override
public void close() throws Exception {
// 清理逻辑,如关闭API连接等
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this class for? Seems it's completely useless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this in documentation if this was meant to be an implementation reference.

<module>flink-cdc-runtime</module>
<module>flink-cdc-e2e-tests</module>
<module>flink-cdc-pipeline-udf-examples</module>
<module>flink-cdc-openai-udf</module>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe putting it as a part of SystemFunctionUtils is enough? Creating a new top-level package seems too much.

new SqlFunction(
"GET_EMBEDDING",
SqlKind.OTHER_FUNCTION,
ReturnTypes.explicit(SqlTypeName.VARCHAR), // 返回类型是数组
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SqlTypeName.VARCHAR), // 返回类型是数组

This comment seems a little untrue. Also, after we have user-defined-models, are these definitions still necessary?

Comment on lines +26 to +31
private static final Logger LOG = LoggerFactory.getLogger(ModelUdf.class);
private static final String DEFAULT_API_KEY =
"sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b";
private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
private static final int DEFAULT_TIMEOUT_SECONDS = 30;
private static final String DEFAULT_BASE_URL = "https://api.gpt.ge/v1/";
Copy link
Member

@yuxiqian yuxiqian Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing these "default" API keys and endpoints aren't meaningful. Request tokens might ran out, endpoints might expire.

Comment on lines +93 to +95
private List<UdfDef> convertModelsToUdfs(List<ModelDef> models) {
return models.stream().map(this::modelToUdf).collect(Collectors.toList());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion is quite worrying, since obviously ModelDef carries more information (the params), and the conversion could not be done without some hacky approaches like pasting params into the name field.

Modifying the addUdfFunctions function, making it accepting a more universal data structure (instead of Name - Classpath Tuple2) makes more sense to me.

@ruanhang1993
Copy link
Contributor

Provided by #3753 . Close this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants