Skip to content

Commit ded161f

Browse files
jean-philippe-martinmziccard
authored andcommitted
Add ParallelCountBytes
Made it and CountBytes compute an MD5, so I could check that they match (they do).
1 parent 9e94824 commit ded161f

3 files changed

Lines changed: 184 additions & 0 deletions

File tree

gcloud-java-examples/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
<mainClass>com.google.cloud.examples.nio.CountBytes</mainClass>
6969
<name>CountBytes</name>
7070
</program>
71+
<program>
72+
<mainClass>com.google.cloud.examples.nio.ParallelCountBytes</mainClass>
73+
<name>ParallelCountBytes</name>
74+
</program>
7175
<program>
7276
<mainClass>
7377
com.google.cloud.examples.resourcemanager.ResourceManagerExample

gcloud-java-examples/src/main/java/com/google/cloud/examples/nio/CountBytes.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import com.google.common.base.Stopwatch;
2020

21+
import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
2122
import java.io.IOException;
2223
import java.net.URI;
2324
import java.nio.ByteBuffer;
2425
import java.nio.channels.SeekableByteChannel;
2526
import java.nio.file.Files;
2627
import java.nio.file.Path;
2728
import java.nio.file.Paths;
29+
import java.security.MessageDigest;
2830
import java.util.concurrent.TimeUnit;
2931

3032
/**
@@ -72,15 +74,19 @@ private static void countFile(String fname) {
7274
SeekableByteChannel chan = Files.newByteChannel(path);
7375
long total = 0;
7476
int readCalls = 0;
77+
MessageDigest md = MessageDigest.getInstance("MD5");
7578
while (chan.read(buf) > 0) {
7679
readCalls++;
80+
md.update(buf.array(), 0, buf.position());
7781
total += buf.position();
7882
buf.flip();
7983
}
8084
readCalls++; // We must count the last call
8185
long elapsed = sw.elapsed(TimeUnit.SECONDS);
8286
System.out.println("Read all " + total + " bytes in " + elapsed + "s. " +
8387
"(" + readCalls +" calls to chan.read)");
88+
String hex = (new HexBinaryAdapter()).marshal(md.digest());
89+
System.out.println("The MD5 is: 0x" + hex);
8490
if (total != size) {
8591
System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " +
8692
"yet the file size is listed at " + size + " bytes.");
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.examples.nio;
18+
19+
import com.google.common.base.Stopwatch;
20+
21+
import javax.xml.bind.annotation.adapters.HexBinaryAdapter;
22+
import java.io.IOException;
23+
import java.net.URI;
24+
import java.nio.ByteBuffer;
25+
import java.nio.channels.SeekableByteChannel;
26+
import java.nio.file.Files;
27+
import java.nio.file.Path;
28+
import java.nio.file.Paths;
29+
import java.security.MessageDigest;
30+
import java.util.concurrent.TimeUnit;
31+
32+
/**
33+
* ParallelCountBytes will read through the whole file given as input.
34+
*
35+
* <p>This example shows how to go through all the contents of a file,
36+
* in order, using multithreaded NIO reads.It also reports how long it took.
37+
*
38+
* <p>See the README for compilation instructions. Run this code with
39+
* {@code target/appassembler/bin/ParallelCountBytes <file>}
40+
*/
41+
public class ParallelCountBytes {
42+
43+
private class BufWithLock {
44+
public Object lock;
45+
public ByteBuffer buf;
46+
public boolean full;
47+
public Thread t;
48+
49+
public BufWithLock(int size) {
50+
this.buf = ByteBuffer.allocate(size);
51+
this.lock = new Object();
52+
}
53+
}
54+
55+
/**
56+
* See the class documentation.
57+
*/
58+
public static void main(String[] args) throws IOException {
59+
new ParallelCountBytes().start(args);
60+
}
61+
62+
public void start(String[] args) throws IOException {
63+
if (args.length == 0 || args[0].equals("--help")) {
64+
help();
65+
return;
66+
}
67+
for (String a : args) {
68+
countFile(a);
69+
}
70+
}
71+
72+
private void stridedRead(SeekableByteChannel chan, int blockSize, int firstBlock, int stride, BufWithLock output) {
73+
try {
74+
// stagger the threads a little bit.
75+
Thread.sleep(250 * firstBlock);
76+
long pos = firstBlock * blockSize;
77+
synchronized(output.lock) {
78+
while (true) {
79+
if (pos > chan.size()) {
80+
break;
81+
}
82+
chan.position(pos);
83+
// read until buffer is full, or EOF
84+
while (chan.read(output.buf) > 0) {};
85+
output.full = true;
86+
output.lock.notifyAll();
87+
if (output.buf.hasRemaining()) {
88+
break;
89+
}
90+
// wait for main thread to process it
91+
while (output.full) {
92+
output.lock.wait();
93+
}
94+
output.buf.flip();
95+
pos += stride * blockSize;
96+
}
97+
}
98+
} catch (InterruptedException | IOException o) {
99+
// this simple example doesn't handle errors, sorry.
100+
}
101+
}
102+
103+
/**
104+
* Print the length of the indicated file.
105+
*
106+
* <p>This uses the normal Java NIO Api, so it can take advantage of any installed
107+
* NIO Filesystem provider without any extra effort.
108+
*/
109+
private void countFile(String fname) throws IOException{
110+
// large buffers pay off
111+
final int bufSize = 50 * 1024 * 1024;
112+
try {
113+
Path path = Paths.get(new URI(fname));
114+
long size = Files.size(path);
115+
System.out.println(fname + ": " + size + " bytes.");
116+
ByteBuffer buf = ByteBuffer.allocate(bufSize);
117+
int nBlocks = (int)Math.ceil( size / (double)bufSize);
118+
int nThreads = nBlocks;
119+
if (nThreads > 4) nThreads = 4;
120+
System.out.println("Reading the whole file using " + nThreads + " threads...");
121+
Stopwatch sw = Stopwatch.createStarted();
122+
final BufWithLock[] bufs = new BufWithLock[nThreads];
123+
for (int i = 0; i < nThreads; i++) {
124+
bufs[i] = new BufWithLock(bufSize);
125+
final SeekableByteChannel chan = Files.newByteChannel(path);
126+
final int finalNThreads = nThreads;
127+
final int finalI = i;
128+
bufs[i].t = new Thread(new Runnable() {
129+
@Override
130+
public void run() {
131+
stridedRead(chan, bufSize, finalI, finalNThreads, bufs[finalI]);
132+
}
133+
});
134+
bufs[i].t.start();
135+
}
136+
137+
long total = 0;
138+
MessageDigest md = MessageDigest.getInstance("MD5");
139+
for (int block = 0; block < nBlocks; block++) {
140+
BufWithLock bwl = bufs[block % bufs.length];
141+
synchronized (bwl.lock) {
142+
while (!bwl.full) {
143+
bwl.lock.wait();
144+
}
145+
md.update(bwl.buf.array(), 0, bwl.buf.position());
146+
total += bwl.buf.position();
147+
bwl.full = false;
148+
bwl.lock.notifyAll();
149+
}
150+
}
151+
152+
long elapsed = sw.elapsed(TimeUnit.SECONDS);
153+
System.out.println("Read all " + total + " bytes in " + elapsed + "s. ");
154+
String hex = (new HexBinaryAdapter()).marshal(md.digest());
155+
System.out.println("The MD5 is: 0x" + hex);
156+
if (total != size) {
157+
System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " +
158+
"yet the file size is listed at " + size + " bytes.");
159+
}
160+
} catch (Exception ex) {
161+
System.out.println(fname + ": " + ex.toString());
162+
}
163+
}
164+
165+
private static void help() {
166+
String[] help =
167+
{"The argument is a <path>",
168+
"and we show the length of that file."
169+
};
170+
for (String s : help) {
171+
System.out.println(s);
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)