Skip to content

Commit 1924dbb

Browse files
pongadgarrettjonesgoogle
authored andcommitted
---
yaml --- r: 5507 b: refs/heads/master c: d726f41 h: refs/heads/master i: 5505: a490bf8 5503: e36c32f
1 parent c53d046 commit 1924dbb

3 files changed

Lines changed: 98 additions & 18 deletions

File tree

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: 9012428ac739979361c0af045323217fd41687be
2+
refs/heads/master: d726f41c099c8213ea66eeec6f892887de65ebb4
33
refs/heads/travis: dae77e558b884bc1b165155482d76c8e40b0fca4
44
refs/heads/gh-pages: 4936f6d1c43be1ab76229d2743bae07f4b4124b3
55
refs/tags/0.0.9: 22f1839238f66c39e67ed4dfdcd273b1ae2e8444

trunk/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingHandler.java

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818

1919
import static com.google.common.base.MoreObjects.firstNonNull;
2020

21+
import com.google.api.gax.core.ApiFuture;
22+
import com.google.api.gax.core.ApiFutureCallback;
23+
import com.google.api.gax.core.ApiFutures;
2124
import com.google.cloud.MonitoredResource;
2225
import com.google.cloud.logging.Logging.WriteOption;
23-
import com.google.api.gax.core.ApiFutures;
24-
import com.google.api.gax.core.ApiFutureCallback;
2526
import com.google.common.collect.ImmutableList;
2627
import com.google.common.collect.ImmutableMap;
28+
import com.google.common.util.concurrent.Uninterruptibles;
2729
import java.util.ArrayList;
2830
import java.util.Collections;
29-
import java.util.LinkedList;
31+
import java.util.IdentityHashMap;
3032
import java.util.List;
33+
import java.util.Set;
3134
import java.util.logging.ErrorManager;
3235
import java.util.logging.Filter;
3336
import java.util.logging.Formatter;
@@ -120,6 +123,10 @@ public class LoggingHandler extends Handler {
120123
// https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1740 .
121124
private final Level baseLevel;
122125

126+
private final Object writeLock = new Object();
127+
private final Set<ApiFuture<Void>> pendingWrites =
128+
Collections.newSetFromMap(new IdentityHashMap<ApiFuture<Void>, Boolean>());
129+
123130
/**
124131
* Creates an handler that publishes messages to Stackdriver Logging.
125132
*/
@@ -376,6 +383,9 @@ public void publish(LogRecord record) {
376383
if (entry != null) {
377384
write(entry, writeOptions);
378385
}
386+
if (record.getLevel().intValue() >= flushLevel.intValue()) {
387+
flush();
388+
}
379389
} finally {
380390
inPublishCall.remove();
381391
}
@@ -457,28 +467,60 @@ void write(LogEntry entry, WriteOption... options) {
457467
reportError(null, ex, ErrorManager.FLUSH_FAILURE);
458468
}
459469
break;
470+
460471
case ASYNC:
461472
default:
462-
ApiFutures.addCallback(getLogging().writeAsync(entryList, options), new ApiFutureCallback<Void>() {
463-
@Override
464-
public void onSuccess(Void v) {}
465-
466-
@Override
467-
public void onFailure(Throwable t) {
468-
if (t instanceof Exception) {
469-
reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE);
470-
} else {
471-
reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE);
472-
}
473-
}
474-
});
473+
final ApiFuture<Void> writeFuture = getLogging().writeAsync(entryList, options);
474+
synchronized(writeLock) {
475+
pendingWrites.add(writeFuture);
476+
}
477+
ApiFutures.addCallback(
478+
writeFuture,
479+
new ApiFutureCallback<Void>() {
480+
private void removeFromPending() {
481+
synchronized(writeLock) {
482+
pendingWrites.remove(writeFuture);
483+
}
484+
}
485+
486+
@Override
487+
public void onSuccess(Void v) {
488+
removeFromPending();
489+
}
490+
491+
@Override
492+
public void onFailure(Throwable t) {
493+
try {
494+
if (t instanceof Exception) {
495+
reportError(null, (Exception) t, ErrorManager.FLUSH_FAILURE);
496+
} else {
497+
reportError(null, new Exception(t), ErrorManager.FLUSH_FAILURE);
498+
}
499+
} finally {
500+
removeFromPending();
501+
}
502+
}
503+
});
475504
break;
476505
}
477506
}
478507

479508
@Override
480509
public void flush() {
481-
// BUG(1795): flush is broken, need support from batching implementation.
510+
// BUG(1795): We should force batcher to issue RPC call for buffered messages,
511+
// so the code below doesn't wait uselessly.
512+
513+
ArrayList<ApiFuture<Void>> writesToFlush = new ArrayList<>();
514+
synchronized(writeLock) {
515+
writesToFlush.addAll(pendingWrites);
516+
}
517+
for (ApiFuture<Void> write : writesToFlush) {
518+
try {
519+
Uninterruptibles.getUninterruptibly(write);
520+
} catch (Exception e) {
521+
// Ignore exceptions, they are propagated to the error manager.
522+
}
523+
}
482524
}
483525

484526
/**

trunk/google-cloud-logging/src/test/java/com/google/cloud/logging/LoggingHandlerTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertNotNull;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.assertFalse;
2123

2224
import com.google.api.gax.core.ApiFutures;
25+
import com.google.api.gax.core.SettableApiFuture;
2326
import com.google.cloud.MonitoredResource;
2427
import com.google.cloud.logging.LogEntry.Builder;
2528
import com.google.cloud.logging.Logging.WriteOption;
@@ -380,6 +383,41 @@ public void testFlushLevel() {
380383
handler.publish(newLogRecord(Level.WARNING, MESSAGE));
381384
}
382385

386+
@Test
387+
public void testFlush() throws InterruptedException {
388+
final SettableApiFuture<Void> mockRpc = SettableApiFuture.create();
389+
390+
EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();
391+
EasyMock.expect(options.getService()).andReturn(logging);
392+
logging.writeAsync(ImmutableList.of(INFO_ENTRY), DEFAULT_OPTIONS);
393+
EasyMock.expectLastCall().andReturn(mockRpc);
394+
EasyMock.replay(options, logging);
395+
final LoggingHandler handler = new LoggingHandler(LOG_NAME, options);
396+
handler.setFormatter(new TestFormatter());
397+
398+
// no messages, nothing to flush.
399+
handler.flush();
400+
401+
// send a message
402+
handler.publish(newLogRecord(Level.INFO, MESSAGE));
403+
Thread flushWaiter = new Thread(new Runnable() {
404+
@Override
405+
public void run() {
406+
handler.flush();
407+
}
408+
});
409+
flushWaiter.start();
410+
411+
// flushWaiter should be waiting for mockRpc to complete.
412+
flushWaiter.join(1000);
413+
assertTrue(flushWaiter.isAlive());
414+
415+
// With the RPC completed, flush should return, and the thread should terminate.
416+
mockRpc.set(null);
417+
flushWaiter.join(1000);
418+
assertFalse(flushWaiter.isAlive());
419+
}
420+
383421
@Test
384422
public void testSyncWrite() {
385423
EasyMock.expect(options.getProjectId()).andReturn(PROJECT).anyTimes();

0 commit comments

Comments
 (0)