Skip to content

Conversation

@FuYouJ
Copy link
Contributor

@FuYouJ FuYouJ commented Aug 26, 2024

Purpose of this pull request

image

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "Currently SPARK/FLINK do not support multiple table read")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark already supports multiple tables. You need to enable test case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@FuYouJ FuYouJ requested a review from Carl-Zhou-CN August 30, 2024 01:59
@Hisoka-X Hisoka-X linked an issue Aug 30, 2024 that may be closed by this pull request
3 tasks
return getDocsWithTransformDate(source, index, Collections.emptyList());
}

//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
esRestClient.getFieldTypeMapping(index, source);
esRestClient.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a risk that the connection is not closed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented Closeable for EsRestClient and extracted the code into a separate method to ensure that resources are closed even if an exception occurs

    private Map<String, BasicTypeDefine<EsType>> getFieldTypeMapping(
            String index, List<String> source) {
        // EsRestClient#getFieldTypeMapping may throw runtime exception
        // so here we use try-resources-finally to close the resource
        try (EsRestClient esRestClient = EsRestClient.createInstance(connectionConfig)) {
            return esRestClient.getFieldTypeMapping(index, source);
        }
    }

esRestClient.getFieldTypeMapping("st_index4", Lists.newArrayList());
Thread.sleep(2000);
esRestClient.getFieldTypeMapping(indexName, Lists.newArrayList());
Thread.sleep(5000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can unify a parameter

Copy link
Contributor Author

@FuYouJ FuYouJ Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now all refresh waiting times use static variables.

private static final long INDEX_REFRESH_MILL_DELAY = 5000L;

@FuYouJ FuYouJ requested a review from Carl-Zhou-CN August 31, 2024 07:28
Carl-Zhou-CN
Carl-Zhou-CN previously approved these changes Sep 1, 2024
Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

tls_verify_hostname = false

index = "multi_source_write_test_index"
index_type = "st"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@FuYouJ FuYouJ Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your suggestion, I modified my e2e test.
Now it reads different fields from different indices and writes them into different target indices.

env {
  parallelism = 1
  job.mode = "BATCH"
  #checkpoint.interval = 10000
}

source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index_list = [
       {
           index = "read_filter_index1"
           query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp,
           c_null
           ]
           array_column = {
           c_array = "array<tinyint>"
           }
       }
       {
           index = "read_filter_index2"
           query = {"range": {"c_int2": {"gte": 10, "lte": 20}}}
           source = [
           c_int2,
           c_null2,
           c_date2
           ]

       }

    ]

  }
}

transform {
}

sink {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "${table_name}_copy"
    index_type = "st"
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
  }
}

Comment on lines 40 to 76
c_map,
c_array,
c_string,
c_boolean,
c_tinyint,
c_smallint,
c_bigint,
c_float,
c_double,
c_decimal,
c_bytes,
c_int,
c_date,
c_timestamp
]
array_column = {
c_array = "array<tinyint>"
}
}
{
index = "read_index2"
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
source = [
c_map,
c_array,
c_string,
c_boolean,
c_tinyint,
c_smallint,
c_bigint,
c_float,
c_double,
c_decimal,
c_bytes,
c_int,
c_date,
c_timestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should define indexes using different fields

e.g

index_1: x, y, z...
index_2: a,b,c...

Copy link
Contributor Author

@FuYouJ FuYouJ Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your suggestion, I modified my e2e test.
Now it reads different fields from different indices and writes them into different target indices.

env {
  parallelism = 1
  job.mode = "BATCH"
  #checkpoint.interval = 10000
}

source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index_list = [
       {
           index = "read_filter_index1"
           query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp,
           c_null
           ]
           array_column = {
           c_array = "array<tinyint>"
           }
       }
       {
           index = "read_filter_index2"
           query = {"range": {"c_int2": {"gte": 10, "lte": 20}}}
           source = [
           c_int2,
           c_null2,
           c_date2
           ]

       }

    ]

  }
}

transform {
}

sink {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "${table_name}_copy"
    index_type = "st"
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
  }
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(HOSTS, INDEX)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it compatible with old versions?

Copy link
Contributor Author

@FuYouJ FuYouJ Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully compatible with previous configurations, in the ElasticsearchSource code, the first if step is to check for the existence of index_ist. If it exists, it should be parsed synchronously as multiple tables, and if it does not exist, it should be parsed synchronously as a single table

    public ElasticsearchSource(ReadonlyConfig config) {
        this.connectionConfig = config;
        boolean multiSource = config.getOptional(SourceConfig.INDEX_LIST).isPresent();
        boolean singleSource = config.getOptional(SourceConfig.INDEX).isPresent();
        if (multiSource && singleSource) {
            log.warn(
                    "Elasticsearch Source config warn: when both 'index' and 'index_list' are present in the configuration, only the 'index_list' configuration will take effect");
        }
        if (!multiSource && !singleSource) {
            throw new ElasticsearchConnectorException(
                    ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01,
                    ElasticsearchConnectorErrorCode.SOURCE_CONFIG_ERROR_01.getDescription());
        }
        if (multiSource) {
            this.sourceConfigList = createMultiSource(config);
        } else {
            this.sourceConfigList = Collections.singletonList(parseOneIndexQueryConfig(config));
        }
    }

@hailin0
Copy link
Member

hailin0 commented Sep 3, 2024

What will happen if I configure index & index_list connector at the same time

@FuYouJ
Copy link
Contributor Author

FuYouJ commented Sep 3, 2024

What will happen if I configure index & index_list connector at the same time

The program will print a warning log to tell the user the configured processing priority。

f (multiSource && singleSource) {
            log.warn(
                    "Elasticsearch Source config warn: when both 'index' and 'index_list' are present in the configuration, only the 'index_list' configuration will take effect");
        }

Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @FuYouJ

Waiting for ci passed

Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@Carl-Zhou-CN
Copy link
Member

Thank you for your contribution @FuYouJ

@Carl-Zhou-CN Carl-Zhou-CN merged commit 29fbeb2 into apache:dev Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature][Elastic search] Support multi-table source feature

3 participants