Skip to content

Commit 4f52eb5

Browse files
committed
Add codec to encode/decode GRPC data frame.
1 parent 1260794 commit 4f52eb5

File tree

4 files changed

+266
-17
lines changed

4 files changed

+266
-17
lines changed

source/common/grpc/codec.cc

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#include "third_party/envoy/src/source/common/grpc/codec.h"
2+
3+
#include "common/buffer/buffer_impl.h"
4+
5+
namespace Grpc {
6+
7+
Encoder::Encoder() {}
8+
9+
void Encoder::NewFrame(uint8_t flags, uint64_t length, uint8_t* output) {
10+
output[0] = flags;
11+
output[1] = static_cast<uint8_t>(length >> 24);
12+
output[2] = static_cast<uint8_t>(length >> 16);
13+
output[3] = static_cast<uint8_t>(length >> 8);
14+
output[4] = static_cast<uint8_t>(length);
15+
}
16+
17+
Decoder::Decoder() : state_(State::FH_FLAG) {}
18+
19+
bool Decoder::Decode(Buffer::Instance& input, std::vector<Frame>* output) {
20+
uint64_t count = input.getRawSlices(nullptr, 0);
21+
Buffer::RawSlice slices[count];
22+
input.getRawSlices(slices, count);
23+
for (Buffer::RawSlice& slice : slices) {
24+
uint8_t* mem = reinterpret_cast<uint8_t*>(slice.mem_);
25+
for (uint64_t j = 0; j < slice.len_;) {
26+
uint8_t c = *mem;
27+
switch (state_) {
28+
case State::FH_FLAG:
29+
if (c & !GRPC_FH_COMPRESSED) {
30+
// Unsupported flags.
31+
return false;
32+
}
33+
frame_.flags = c;
34+
state_ = State::FH_LEN_0;
35+
mem++;
36+
j++;
37+
break;
38+
case State::FH_LEN_0:
39+
frame_.length = static_cast<uint32_t>(c) << 24;
40+
state_ = State::FH_LEN_1;
41+
mem++;
42+
j++;
43+
break;
44+
case State::FH_LEN_1:
45+
frame_.length |= static_cast<uint32_t>(c) << 16;
46+
state_ = State::FH_LEN_2;
47+
mem++;
48+
j++;
49+
break;
50+
case State::FH_LEN_2:
51+
frame_.length |= static_cast<uint32_t>(c) << 8;
52+
state_ = State::FH_LEN_3;
53+
mem++;
54+
j++;
55+
break;
56+
case State::FH_LEN_3:
57+
frame_.length |= static_cast<uint32_t>(c);
58+
frame_.data = new Buffer::OwnedImpl();
59+
state_ = State::DATA;
60+
mem++;
61+
j++;
62+
break;
63+
case State::DATA:
64+
uint64_t remain_in_buffer = slice.len_ - j;
65+
uint64_t remain_in_frame = frame_.length - frame_.data->length();
66+
if (remain_in_buffer <= remain_in_frame) {
67+
frame_.data->add(mem, remain_in_buffer);
68+
mem += remain_in_buffer;
69+
j += remain_in_buffer;
70+
} else {
71+
frame_.data->add(mem, remain_in_frame);
72+
mem += remain_in_frame;
73+
j += remain_in_frame;
74+
}
75+
if (frame_.length == frame_.data->length()) {
76+
output->push_back(frame_); // make a copy.
77+
frame_.flags = 0;
78+
frame_.length = 0;
79+
frame_.data = nullptr;
80+
state_ = State::FH_FLAG;
81+
}
82+
break;
83+
}
84+
}
85+
}
86+
return true;
87+
}
88+
89+
} // namespace Grpc

source/common/grpc/codec.h

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#pragma once
2+
3+
#include "envoy/buffer/buffer.h"
4+
5+
namespace Grpc {
6+
// Last bit for an expanded message without compression.
7+
const uint8_t GRPC_FH_DEFAULT = 0b0u;
8+
// Last bit for a compressed message.
9+
const uint8_t GRPC_FH_COMPRESSED = 0b1u;
10+
11+
enum class CompressionAlgorithm { None, Gzip };
12+
13+
struct Frame {
14+
uint8_t flags;
15+
uint32_t length;
16+
Buffer::Instance* data;
17+
};
18+
19+
class Encoder {
20+
public:
21+
Encoder();
22+
23+
// Creates a new GRPC data frame with the given flags and length.
24+
// @param flags supplies the GRPC data frame flags.
25+
// @param length supplies the GRPC data frame length.
26+
// @param output the buffer to store the encoded data, it's size must be 5.
27+
void NewFrame(uint8_t flags, uint64_t length, uint8_t* output);
28+
};
29+
30+
class Decoder {
31+
public:
32+
Decoder();
33+
34+
// Decodes the given buffer with GRPC data frame.
35+
// @param input supplies the binary octets wrapped in a GRPC data frame.
36+
// @param input supplies the buffer to store the decoded data.
37+
// @return bool whether the decoding success.
38+
bool Decode(Buffer::Instance& input, std::vector<Frame>* output);
39+
40+
private:
41+
// Wiring format of GRPC data frame header:
42+
// -----------------------------------------------------------------------
43+
// |R|R|R|R|R|R|R|R|C| L | L | L | L |
44+
// -----------------------------------------------------------------------
45+
// Flag (1 byte) Message Length (4 bytes)
46+
//
47+
// A fixed header consists of five bytes.
48+
// The first byte is the Flag. The last one "C" bit indicates if the message
49+
// is compressed or not (0 is uncompressed, 1 is compressed). The rest seven
50+
// "R" bits is reserved for future use.
51+
// The next four "L" bytes represent the message length in BigEndian format.
52+
enum class State {
53+
// Waiting for decoding the flags (1 byte) of the GRPC data frame.
54+
FH_FLAG,
55+
// Waiting for decoding the 1st byte of the length (4 bytes in total) of the
56+
// GRPC data frame.
57+
FH_LEN_0,
58+
// Waiting for decoding the 2nd byte of the length (4 bytes in total) of the
59+
// GRPC data frame.
60+
FH_LEN_1,
61+
// Waiting for decoding the 3rd byte of the length (4 bytes in total) of the
62+
// GRPC data frame.
63+
FH_LEN_2,
64+
// Waiting for decoding the 4th byte of the length (4 bytes in total) of the
65+
// GRPC data frame.
66+
FH_LEN_3,
67+
// Waiting for decoding the data.
68+
DATA,
69+
};
70+
71+
State state_;
72+
Frame frame_;
73+
};
74+
} // Grpc

test/CMakeLists.txt

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,20 @@ include_directories(${PROJECT_BINARY_DIR})
2020
include_directories(SYSTEM ${ENVOY_OPENSSL_INCLUDE_DIR})
2121
include_directories(${ENVOY_NGHTTP2_INCLUDE_DIR})
2222
include_directories(SYSTEM ${ENVOY_LIGHTSTEP_TRACER_INCLUDE_DIR})
23-
include_directories(${ENVOY_LIBEVENT_INCLUDE_DIR})
2423

2524
add_executable(envoy-test
2625
$<TARGET_OBJECTS:envoy-server>
2726
$<TARGET_OBJECTS:envoy-common>
2827
${ENVOY_TEST_EXTRA_OBJECTS}
2928
common/access_log/access_log_manager_impl_test.cc
3029
common/api/api_impl_test.cc
31-
common/common/base64_test.cc
3230
common/common/hex_test.cc
3331
common/common/optional_test.cc
3432
common/common/utility_test.cc
35-
common/event/dispatcher_impl_test.cc
3633
common/event/file_event_impl_test.cc
3734
common/filesystem/filesystem_impl_test.cc
3835
common/filesystem/watcher_impl_test.cc
36+
common/grpc/codec_test.cc
3937
common/grpc/common_test.cc
4038
common/grpc/http1_bridge_filter_test.cc
4139
common/grpc/rpc_channel_impl_test.cc
@@ -47,9 +45,7 @@ add_executable(envoy-test
4745
common/http/common.cc
4846
common/http/conn_manager_impl_test.cc
4947
common/http/conn_manager_utility_test.cc
50-
common/http/date_provider_impl_test.cc
5148
common/http/filter/buffer_filter_test.cc
52-
common/http/filter/fault_filter_test.cc
5349
common/http/filter/ratelimit_test.cc
5450
common/http/header_map_impl_test.cc
5551
common/http/http1/codec_impl_test.cc
@@ -72,28 +68,20 @@ add_executable(envoy-test
7268
common/network/connection_impl_test.cc
7369
common/network/dns_impl_test.cc
7470
common/network/filter_manager_impl_test.cc
75-
common/network/listener_impl_test.cc
7671
common/network/listen_socket_impl_test.cc
7772
common/network/proxy_protocol_test.cc
7873
common/network/utility_test.cc
7974
common/ratelimit/ratelimit_impl_test.cc
80-
common/redis/codec_impl_test.cc
81-
common/redis/conn_pool_impl_test.cc
82-
common/redis/proxy_filter_test.cc
8375
common/router/config_impl_test.cc
8476
common/router/retry_state_impl_test.cc
8577
common/router/router_test.cc
86-
common/router/router_ratelimit_test.cc
8778
common/router/shadow_writer_impl_test.cc
8879
common/runtime/runtime_impl_test.cc
8980
common/runtime/uuid_util_test.cc
9081
common/ssl/connection_impl_test.cc
9182
common/ssl/context_impl_test.cc
92-
common/stats/stats_impl_test.cc
9383
common/stats/statsd_test.cc
94-
common/stats/thread_local_store_test.cc
9584
common/tracing/http_tracer_impl_test.cc
96-
common/upstream/cds_api_impl_test.cc
9785
common/upstream/cluster_manager_impl_test.cc
9886
common/upstream/health_checker_impl_test.cc
9987
common/upstream/host_utility_test.cc
@@ -125,10 +113,8 @@ add_executable(envoy-test
125113
mocks/filesystem/mocks.cc
126114
mocks/grpc/mocks.cc
127115
mocks/http/mocks.cc
128-
mocks/local_info/mocks.cc
129116
mocks/network/mocks.cc
130117
mocks/ratelimit/mocks.cc
131-
mocks/redis/mocks.cc
132118
mocks/router/mocks.cc
133119
mocks/runtime/mocks.cc
134120
mocks/server/mocks.cc
@@ -137,8 +123,6 @@ add_executable(envoy-test
137123
mocks/thread_local/mocks.cc
138124
mocks/tracing/mocks.cc
139125
mocks/upstream/mocks.cc
140-
server/config/http/config_test.cc
141-
server/config/network/config_test.cc
142126
server/config/network/http_connection_manager_test.cc
143127
server/configuration_impl_test.cc
144128
server/connection_handler_test.cc
@@ -154,8 +138,10 @@ if (ENVOY_TCMALLOC)
154138
endif()
155139

156140
target_link_libraries(envoy-test event)
141+
target_link_libraries(envoy-test event_openssl)
157142
target_link_libraries(envoy-test event_pthreads)
158143
target_link_libraries(envoy-test http_parser)
144+
target_link_libraries(envoy-test jansson)
159145
target_link_libraries(envoy-test ssl)
160146
target_link_libraries(envoy-test crypto)
161147
target_link_libraries(envoy-test nghttp2)

test/common/grpc/codec_test.cc

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#include "common/buffer/buffer_impl.h"
2+
#include "common/grpc/codec.h"
3+
#include "test/generated/helloworld.pb.h"
4+
5+
namespace Grpc {
6+
7+
TEST(CodecTest, encodeHeader) {
8+
Encoder encoder;
9+
uint8_t buffer[5];
10+
11+
encoder.NewFrame(GRPC_FH_DEFAULT, 1, buffer);
12+
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
13+
EXPECT_EQ(buffer[1], 0);
14+
EXPECT_EQ(buffer[2], 0);
15+
EXPECT_EQ(buffer[3], 0);
16+
EXPECT_EQ(buffer[4], 1);
17+
18+
encoder.NewFrame(GRPC_FH_COMPRESSED, 1, buffer);
19+
EXPECT_EQ(buffer[0], GRPC_FH_COMPRESSED);
20+
EXPECT_EQ(buffer[1], 0);
21+
EXPECT_EQ(buffer[2], 0);
22+
EXPECT_EQ(buffer[3], 0);
23+
EXPECT_EQ(buffer[4], 1);
24+
25+
encoder.NewFrame(GRPC_FH_DEFAULT, 0x100, buffer);
26+
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
27+
EXPECT_EQ(buffer[1], 0);
28+
EXPECT_EQ(buffer[2], 0);
29+
EXPECT_EQ(buffer[3], 1);
30+
EXPECT_EQ(buffer[4], 0);
31+
32+
encoder.NewFrame(GRPC_FH_DEFAULT, 0x10000, buffer);
33+
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
34+
EXPECT_EQ(buffer[1], 0);
35+
EXPECT_EQ(buffer[2], 1);
36+
EXPECT_EQ(buffer[3], 0);
37+
EXPECT_EQ(buffer[4], 0);
38+
39+
encoder.NewFrame(GRPC_FH_DEFAULT, 0x1000000, buffer);
40+
EXPECT_EQ(buffer[0], GRPC_FH_DEFAULT);
41+
EXPECT_EQ(buffer[1], 1);
42+
EXPECT_EQ(buffer[2], 0);
43+
EXPECT_EQ(buffer[3], 0);
44+
EXPECT_EQ(buffer[4], 0);
45+
}
46+
47+
TEST(CodecTest, decodeSingleFrame) {
48+
helloworld::HelloRequest request;
49+
request.set_name("hello");
50+
51+
Buffer::OwnedImpl buffer;
52+
uint8_t header[5];
53+
Encoder encoder;
54+
encoder.NewFrame(GRPC_FH_DEFAULT, request.ByteSizeLong(), header);
55+
buffer.add(header, 5);
56+
buffer.add(request.SerializeAsString());
57+
58+
std::vector<Frame> frames;
59+
60+
Decoder decoder;
61+
decoder.Decode(buffer, &frames);
62+
EXPECT_EQ(frames.size(), 1);
63+
EXPECT_EQ(GRPC_FH_DEFAULT, frames[0].flags);
64+
EXPECT_EQ(request.ByteSizeLong(), frames[0].length);
65+
66+
helloworld::HelloRequest result;
67+
result.ParseFromArray(frames[0].data->linearize(frames[0].data->length()),
68+
frames[0].data->length());
69+
EXPECT_EQ("hello", result.name());
70+
}
71+
72+
TEST(CodecTest, decodeMultipleFrame) {
73+
helloworld::HelloRequest request;
74+
request.set_name("hello");
75+
76+
Buffer::OwnedImpl buffer;
77+
uint8_t header[5];
78+
Encoder encoder;
79+
encoder.NewFrame(GRPC_FH_DEFAULT, request.ByteSizeLong(), header);
80+
for (int i = 0; i < 1009; i++) {
81+
buffer.add(header, 5);
82+
buffer.add(request.SerializeAsString());
83+
}
84+
85+
std::vector<Frame> frames;
86+
87+
Decoder decoder;
88+
decoder.Decode(buffer, &frames);
89+
EXPECT_EQ(frames.size(), 1009);
90+
for (Frame& frame : frames) {
91+
EXPECT_EQ(GRPC_FH_DEFAULT, frame.flags);
92+
EXPECT_EQ(request.ByteSizeLong(), frame.length);
93+
94+
helloworld::HelloRequest result;
95+
result.ParseFromArray(frame.data->linearize(frame.data->length()),
96+
frame.data->length());
97+
EXPECT_EQ("hello", result.name());
98+
}
99+
}
100+
} // Grpc

0 commit comments

Comments
 (0)