feat(services/hdfs_native): implement write/read/list methods#5617
feat(services/hdfs_native): implement write/read/list methods#5617Xuanwo merged 45 commits intoapache:mainfrom
Conversation
|
@Xuanwo Hi, where can I add test cases for newly submitted code? |
OpenDAL performs behavior tests on all services. Please check https://github.com/apache/opendal/blob/main/core/tests/behavior/README.md for more details. |
I have a question. These methods were not actually implemented before. How did the test cases pass? |
I have another question. I can't find the problem of CI when I run clippy locally. Do you have any troubleshooting suggestions? |
You could check behavior test codes in the opendal/core/tests/behavior/async_read.rs Lines 31 to 37 in 39ce403 so, if a service hasn't implemented those functionalities, they won't be tested. in this PR, you should enable those capabilities. |
Have you have checked the command used by the CI ? Maybe it differs from what you're using locally, or the versions might be different. |
Thank you very much. I have enabled the relevant configuration according to your suggestion. |
|
@Xuanwo @meteorgan CI passed. PTAL |
Xuanwo
left a comment
There was a problem hiding this comment.
Let's add a behavior test for this service.
We can add a new dir called hdfs_native under https://github.com/apache/opendal/tree/main/.github/services/:
And inside it, we build something similiar to https://github.com/apache/opendal/blob/main/.github/services/hdfs/hdfs_cluster/action.yml
Just change all OPENDAL_HDFS_xx to OPENDAL_HDFS_NATIVE_xx.
| let p = build_rooted_abs_path(&self.root, path); | ||
| let l = HdfsNativeLister::new(p, self.client.clone()); | ||
| Ok((RpList::default(), Some(l))) | ||
| let iter = self.client.list_status_iter(path, true); |
There was a problem hiding this comment.
Hi, please use OpList::recursive instead of always using true.
| impl oio::Read for HdfsNativeReader { | ||
| async fn read(&mut self) -> Result<Buffer> { | ||
| todo!() | ||
| let bytes: Bytes = self.f.read(self.size).await.map_err(parse_hdfs_error)?; |
There was a problem hiding this comment.
This will read all data in memory at once. Please use read_range_stream to get a bytes stream and yield bytes everytime users call read.
| async fn write(&mut self, _bs: Buffer) -> Result<()> { | ||
| todo!() | ||
| async fn write(&mut self, mut bs: Buffer) -> Result<()> { | ||
| while bs.has_remaining() { |
There was a problem hiding this comment.
Bytes::copy_from_slice will copy the entire buffer while writing, it will add much cost here. How about this:
while let Some(bs) = bs.next() {
self.f.write(bs).await
}
The error doesn't seem to be related to my submission. I don't quite understand why this error is reported. Can you give me some suggestions? @Xuanwo @meteorgan |
Yes, not related. New rust has been released. |
|
Hi, @zhaohaidao it looks like you are blocked here. Would you like to share what's wrong happened? |
|
For error like: ---- behavior::test_blocking_write_with_append_returns_metadata ----
test panicked: append to an existing file must success: AlreadyExists (persistent) at write => /tmp/opendal/f286eb91-202c-490b-99dd-04415556e07e/c9203715-f602-4237-9e34-86ed1c8d4297 for client 127.0.0.1 already exists
at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:388)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2536)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2433)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:791)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:475)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)You might need to check the |
|
If you want debug them locally, you can use |
Thanks, I'll try debugging locally |
Thanks, I am tracking down the problem of case failure when option recursive=true, for example test_list_empty_dir. |
|
@Xuanwo CI passed again. PTAL. |
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?