Skip to content

Conversation

@hugoasdf
Copy link
Contributor

@hugoasdf hugoasdf commented Jul 1, 2019

What is the purpose of the change

[Feature] Support message trace feature

Brief changelog

add message trace feature for cpp sdk,add hook,dispatcher

Verifying this change

can send message trace message.
TBD:web console show.

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when a cross-module dependency exists.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

ifplusor and others added 5 commits July 1, 2019 00:30
…variable.

declare correct string::size_type by auto. (apache#143)
* update network interface.

- feature: use only one event loop for all TcpTransport.
- update: network components.

* remove boost mutex, timed_mutex and condition_variable in TcpRemotingClient, TcpTransport and ReponseFunture.
#include "AsyncTraceDispatcher.h"
namespace rocketmq {

MQConsumer::MQConsumer(PullRequest* request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where the param request is used?



private:
std::shared_ptr<TraceDispatcher> traceDispatcher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to 'm_traceDispatcher'

//DefaultMQProducerImpl defaultMQProducerImpl;


std::vector<std::shared_ptr<ConsumeMessageHook> > consumeMessageHookList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.



MQConsumer::~MQConsumer() {
if (traceDispatcher.use_count()>0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use 'use_count() > 0' as condition, but 'traceDispatcher != nullptr'.



#include "AsyncTraceDispatcher.h"
#include <boost/shared_ptr.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where use boost::shared_ptr or boost::weak_ptr ?

@@ -0,0 +1,165 @@
#ifndef __AsyncTraceDispatcher_H__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add Apache License declaration.

//DefaultMQProducer* traceProducer;
std::string TraceTopicName;
AsyncRunnable_run_context(bool stoppedv, int batchSizev,
//AsyncTraceDispatcher* atdv,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format all changed files with clang-format.

public:
SendMessageTraceHookImpl(std::shared_ptr<TraceDispatcher>& localDispatcher);
virtual std::string hookName();
//virtual void sendMessageBefore(SendMessageContext* context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unnecessary functions.


// Caculate the cost time for processing messages
int costTime = 0;//(int)((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculate real costTime.

*/

/*
package org.apache.rocketmq.client.trace.hook;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unnecessary code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review recommends are read,checks will become,thanks.

remove unnecessary code.

@@ -0,0 +1,118 @@
#ifndef __AsyncTraceDispatcher_H__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use big alphabet for macro definition, the same for following files
AsyncTraceDispatcher_H change to ASYNCTRACEDISPATCHER_H


class AsyncTraceDispatcher : public TraceDispatcher, public enable_shared_from_this<AsyncTraceDispatcher> {
private:
// static InternalLogger log; //= ClientLogger.getLog();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete the unnecessary code for new class

std::condition_variable m_appenderQueuenotEmpty;

std::thread* m_shutDownHook;
bool m_stopped = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not assignment class variant here

// std::thread* worker;
std::shared_ptr<std::thread> m_worker;

public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why here is public class members ?


class ConsumeMessageHook {
public:
virtual std::string hookName() { return ""; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here format, please check if there is format problem for others

class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
public:
DefaultMQProducer(const std::string& groupname);
DefaultMQProducer(const std::string& groupname, bool Withouttrace = false, void* rpcHook = nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notice the name format for funtion variant, such as Withouttrace, please check others

static std::string LOCAL_ADDRESS; //= "/*UtilAll.ipToIPv4Str(UtilAll.getIP())*/";

private:
std::string m_topic = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need assignment here

const string MQMessage::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";
const string MQMessage::PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
const string MQMessage::PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
const string MQMessage::CONSUME_CONTEXT_TYPE = "ConsumeContextType";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the same style for parameter name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep the same style for parameter name

updated,-:)

@hugoasdf hugoasdf changed the title [feature]message trace [Feature] Support message trace feature Jul 16, 2019
Copy link
Contributor

@jovany-wang jovany-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, I have 2 suggestions:

  1. Add doc comments for the methods.
  2. Show some usages.

#include "TraceDispatcher.h"
namespace rocketmq {

class AsyncTraceDispatcher : public TraceDispatcher, public enable_shared_from_this<AsyncTraceDispatcher> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public enable_shared_from_this<AsyncTraceDispatcher> seems can be private here right?

std::atomic<long> m_discardCount;
std::shared_ptr<std::thread> m_worker;

// public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line?

std::atomic<bool> m_delydelflag;

public:
bool get_stopped() { return m_stopped; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool get_stopped() { return m_stopped; }
bool get_stopped() const { return m_stopped; }

virtual void start(std::string nameSrvAddr, AccessChannel accessChannel = AccessChannel::LOCAL);

virtual void setdelydelflag(bool v) { m_delydelflag = v; }
bool getdelydelflag() { return m_delydelflag; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool getdelydelflag() { return m_delydelflag; }
bool getdelydelflag() const { return m_delydelflag; }

std::string& getTraceTopicName() { return m_traceTopicName; }

void setTraceTopicName(std::string traceTopicNamev) { m_traceTopicName = traceTopicNamev; }
bool getisStarted() { return m_isStarted.load(); };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool getisStarted() { return m_isStarted.load(); };
bool getisStarted() const { return m_isStarted.load(); };


} // namespace rocketmq

#endif No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add new blank line at the end of this file.

std::string msgnamespace;

public:
std::string getConsumerGroup() { return consumerGroup; };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::string getConsumerGroup() { return consumerGroup; };
std::string getConsumerGroup() const { return consumerGroup; };


void setConsumerGroup(std::string consumerGroup) { consumerGroup = consumerGroup; };

std::list<MQMessageExt> getMsgList() { return msgList; };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
std::list<MQMessageExt> getMsgList() { return msgList; };
std::list<MQMessageExt> getMsgList() const { return msgList; };


void setMsgList(std::list<MQMessageExt> msgList) { msgList = msgList; };
void setMsgList(std::vector<MQMessageExt> pmsgList) { msgList.assign(pmsgList.begin(), pmsgList.end()); };
MQMessageQueue getMq() { return mq; };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MQMessageQueue getMq() { return mq; };
MQMessageQueue getMq() const { return mq; };


class ConsumeMessageHook {
public:
virtual std::string hookName() { return ""; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hookName -> getHookname?

@ShannonDing ShannonDing added the enhancement New feature or request label Jan 10, 2020
@ShannonDing ShannonDing linked an issue Feb 28, 2020 that may be closed by this pull request
@ShannonDing
Copy link
Member

ShannonDing commented Mar 17, 2020

thanks for your PR,
according to the conflicts, the code was refactored in #276, this PR will be closed.
welcome testing this feature.
the message trace, for the async sending, is not supported now, and welcome to add this functionality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE]Try to support trace messages.

5 participants