-
Notifications
You must be signed in to change notification settings - Fork 166
[Feature] Support message trace feature #162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…variable. declare correct string::size_type by auto. (apache#143)
* update network interface. - feature: use only one event loop for all TcpTransport. - update: network components. * remove boost mutex, timed_mutex and condition_variable in TcpRemotingClient, TcpTransport and ReponseFunture.
src/consumer/MQConsumer.cpp
Outdated
| #include "AsyncTraceDispatcher.h" | ||
| namespace rocketmq { | ||
|
|
||
| MQConsumer::MQConsumer(PullRequest* request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where the param request is used?
include/MQConsumer.h
Outdated
|
|
||
|
|
||
| private: | ||
| std::shared_ptr<TraceDispatcher> traceDispatcher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to 'm_traceDispatcher'
include/MQConsumer.h
Outdated
| //DefaultMQProducerImpl defaultMQProducerImpl; | ||
|
|
||
|
|
||
| std::vector<std::shared_ptr<ConsumeMessageHook> > consumeMessageHookList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
src/consumer/MQConsumer.cpp
Outdated
|
|
||
|
|
||
| MQConsumer::~MQConsumer() { | ||
| if (traceDispatcher.use_count()>0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use 'use_count() > 0' as condition, but 'traceDispatcher != nullptr'.
|
|
||
|
|
||
| #include "AsyncTraceDispatcher.h" | ||
| #include <boost/shared_ptr.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where use boost::shared_ptr or boost::weak_ptr ?
include/AsyncTraceDispatcher.h
Outdated
| @@ -0,0 +1,165 @@ | |||
| #ifndef __AsyncTraceDispatcher_H__ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add Apache License declaration.
include/AsyncTraceDispatcher.h
Outdated
| //DefaultMQProducer* traceProducer; | ||
| std::string TraceTopicName; | ||
| AsyncRunnable_run_context(bool stoppedv, int batchSizev, | ||
| //AsyncTraceDispatcher* atdv, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format all changed files with clang-format.
| public: | ||
| SendMessageTraceHookImpl(std::shared_ptr<TraceDispatcher>& localDispatcher); | ||
| virtual std::string hookName(); | ||
| //virtual void sendMessageBefore(SendMessageContext* context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary functions.
|
|
||
| // Caculate the cost time for processing messages | ||
| int costTime = 0;//(int)((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); | ||
| subAfterContext.setCostTime(costTime);// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calculate real costTime.
| */ | ||
|
|
||
| /* | ||
| package org.apache.rocketmq.client.trace.hook; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review recommends are read,checks will become,thanks.
remove unnecessary code.
include/AsyncTraceDispatcher.h
Outdated
| @@ -0,0 +1,118 @@ | |||
| #ifndef __AsyncTraceDispatcher_H__ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use big alphabet for macro definition, the same for following files
AsyncTraceDispatcher_H change to ASYNCTRACEDISPATCHER_H
include/AsyncTraceDispatcher.h
Outdated
|
|
||
| class AsyncTraceDispatcher : public TraceDispatcher, public enable_shared_from_this<AsyncTraceDispatcher> { | ||
| private: | ||
| // static InternalLogger log; //= ClientLogger.getLog(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete the unnecessary code for new class
include/AsyncTraceDispatcher.h
Outdated
| std::condition_variable m_appenderQueuenotEmpty; | ||
|
|
||
| std::thread* m_shutDownHook; | ||
| bool m_stopped = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not assignment class variant here
include/AsyncTraceDispatcher.h
Outdated
| // std::thread* worker; | ||
| std::shared_ptr<std::thread> m_worker; | ||
|
|
||
| public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here is public class members ?
include/ConsumeMessageHook.h
Outdated
|
|
||
| class ConsumeMessageHook { | ||
| public: | ||
| virtual std::string hookName() { return ""; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here format, please check if there is format problem for others
include/DefaultMQProducer.h
Outdated
| class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer { | ||
| public: | ||
| DefaultMQProducer(const std::string& groupname); | ||
| DefaultMQProducer(const std::string& groupname, bool Withouttrace = false, void* rpcHook = nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notice the name format for funtion variant, such as Withouttrace, please check others
include/TraceHelper.h
Outdated
| static std::string LOCAL_ADDRESS; //= "/*UtilAll.ipToIPv4Str(UtilAll.getIP())*/"; | ||
|
|
||
| private: | ||
| std::string m_topic = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need assignment here
src/message/MQMessage.cpp
Outdated
| const string MQMessage::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET"; | ||
| const string MQMessage::PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; | ||
| const string MQMessage::PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS"; | ||
| const string MQMessage::CONSUME_CONTEXT_TYPE = "ConsumeContextType"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the same style for parameter name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the same style for parameter name
updated,-:)
jovany-wang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I have 2 suggestions:
- Add doc comments for the methods.
- Show some usages.
| #include "TraceDispatcher.h" | ||
| namespace rocketmq { | ||
|
|
||
| class AsyncTraceDispatcher : public TraceDispatcher, public enable_shared_from_this<AsyncTraceDispatcher> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public enable_shared_from_this<AsyncTraceDispatcher> seems can be private here right?
| std::atomic<long> m_discardCount; | ||
| std::shared_ptr<std::thread> m_worker; | ||
|
|
||
| // public: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line?
| std::atomic<bool> m_delydelflag; | ||
|
|
||
| public: | ||
| bool get_stopped() { return m_stopped; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| bool get_stopped() { return m_stopped; } | |
| bool get_stopped() const { return m_stopped; } |
| virtual void start(std::string nameSrvAddr, AccessChannel accessChannel = AccessChannel::LOCAL); | ||
|
|
||
| virtual void setdelydelflag(bool v) { m_delydelflag = v; } | ||
| bool getdelydelflag() { return m_delydelflag; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| bool getdelydelflag() { return m_delydelflag; } | |
| bool getdelydelflag() const { return m_delydelflag; } |
| std::string& getTraceTopicName() { return m_traceTopicName; } | ||
|
|
||
| void setTraceTopicName(std::string traceTopicNamev) { m_traceTopicName = traceTopicNamev; } | ||
| bool getisStarted() { return m_isStarted.load(); }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| bool getisStarted() { return m_isStarted.load(); }; | |
| bool getisStarted() const { return m_isStarted.load(); }; |
|
|
||
| } // namespace rocketmq | ||
|
|
||
| #endif No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add new blank line at the end of this file.
| std::string msgnamespace; | ||
|
|
||
| public: | ||
| std::string getConsumerGroup() { return consumerGroup; }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| std::string getConsumerGroup() { return consumerGroup; }; | |
| std::string getConsumerGroup() const { return consumerGroup; }; |
|
|
||
| void setConsumerGroup(std::string consumerGroup) { consumerGroup = consumerGroup; }; | ||
|
|
||
| std::list<MQMessageExt> getMsgList() { return msgList; }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| std::list<MQMessageExt> getMsgList() { return msgList; }; | |
| std::list<MQMessageExt> getMsgList() const { return msgList; }; |
|
|
||
| void setMsgList(std::list<MQMessageExt> msgList) { msgList = msgList; }; | ||
| void setMsgList(std::vector<MQMessageExt> pmsgList) { msgList.assign(pmsgList.begin(), pmsgList.end()); }; | ||
| MQMessageQueue getMq() { return mq; }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| MQMessageQueue getMq() { return mq; }; | |
| MQMessageQueue getMq() const { return mq; }; |
|
|
||
| class ConsumeMessageHook { | ||
| public: | ||
| virtual std::string hookName() { return ""; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hookName -> getHookname?
|
thanks for your PR, |
What is the purpose of the change
[Feature] Support message trace feature
Brief changelog
add message trace feature for cpp sdk,add hook,dispatcher
Verifying this change
can send message trace message.
TBD:web console show.
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.[ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.