2222import com .ctrip .framework .apollo .tracer .Tracer ;
2323import com .ctrip .framework .apollo .tracer .spi .Transaction ;
2424import com .google .common .collect .Queues ;
25+ import javax .annotation .PreDestroy ;
2526import org .slf4j .Logger ;
2627import org .slf4j .LoggerFactory ;
2728import org .springframework .stereotype .Component ;
4344public class DatabaseMessageSender implements MessageSender {
4445 private static final Logger logger = LoggerFactory .getLogger (DatabaseMessageSender .class );
4546 private static final int CLEAN_QUEUE_MAX_SIZE = 100 ;
46- private BlockingQueue <Long > toClean = Queues .newLinkedBlockingQueue (CLEAN_QUEUE_MAX_SIZE );
47+ private final BlockingQueue <Long > toClean = Queues .newLinkedBlockingQueue (CLEAN_QUEUE_MAX_SIZE );
4748 private final ExecutorService cleanExecutorService ;
4849 private final AtomicBoolean cleanStopped ;
4950
@@ -68,7 +69,9 @@ public void sendMessage(String message, String channel) {
6869 Transaction transaction = Tracer .newTransaction ("Apollo.AdminService" , "sendMessage" );
6970 try {
7071 ReleaseMessage newMessage = releaseMessageRepository .save (new ReleaseMessage (message ));
71- toClean .offer (newMessage .getId ());
72+ if (!toClean .offer (newMessage .getId ())){
73+ logger .warn ("Queue is full, Failed to add message {} to clean queue" , newMessage .getId ());
74+ }
7275 transaction .setStatus (Transaction .SUCCESS );
7376 } catch (Throwable ex ) {
7477 logger .error ("Sending message to database failed" , ex );
@@ -116,6 +119,7 @@ private void cleanMessage(Long id) {
116119 }
117120 }
118121
122+ @ PreDestroy
119123 void stopClean () {
120124 cleanStopped .set (true );
121125 }
0 commit comments