Skip to content

Commit ebb5e5d

Browse files
Made code similar to Scala, corrected formatting.
1 parent a229ce8 commit ebb5e5d

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

docs/streaming-programming-guide.md

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,14 +1249,15 @@ dstream.foreachRDD { rdd =>
12491249
<div data-lang="java" markdown="1">
12501250
{% highlight java %}
12511251
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
1252-
@Override public void call(JavaRDD<String> rdd) throws Exception {
1253-
final Connection connection = new Connection(connectionString); // executed at the driver
1252+
@Override
1253+
public void call(JavaRDD<String> rdd) {
1254+
final Connection connection = createNewConnection(); // executed at the driver
12541255
rdd.foreach(new VoidFunction<String>() {
1255-
@Override public void call(String record) throws Exception {
1256+
@Override
1257+
public void call(String record) {
12561258
connection.send(record); // executed at the worker
12571259
}
12581260
});
1259-
connection.close();
12601261
}
12611262
});
12621263
{% endhighlight %}
@@ -1297,10 +1298,12 @@ dstream.foreachRDD { rdd =>
12971298
<div data-lang="java" markdown="1">
12981299
{% highlight java %}
12991300
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
1300-
@Override public void call(JavaRDD<String> rdd) throws Exception {
1301+
@Override
1302+
public void call(JavaRDD<String> rdd) {
13011303
rdd.foreach(new VoidFunction<String>() {
1302-
@Override public void call(String record) throws Exception {
1303-
Connection connection = new Connection(connectionString);
1304+
@Override
1305+
public void call(String record) {
1306+
Connection connection = createNewConnection();
13041307
connection.send(record);
13051308
connection.close();
13061309
}
@@ -1342,10 +1345,12 @@ dstream.foreachRDD { rdd =>
13421345
<div data-lang="java" markdown="1">
13431346
{% highlight java %}
13441347
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
1345-
@Override public void call(JavaRDD<String> rdd) throws Exception {
1348+
@Override
1349+
public void call(JavaRDD<String> rdd) {
13461350
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
1347-
@Override public void call(Iterator<String> partitionOfRecords) throws Exception {
1348-
Connection connection = new Connection(connectionString);
1351+
@Override
1352+
public void call(Iterator<String> partitionOfRecords) {
1353+
Connection connection = createNewConnection();
13491354
while (partitionOfRecords.hasNext()) {
13501355
connection.send(partitionOfRecords.next());
13511356
}
@@ -1392,14 +1397,17 @@ dstream.foreachRDD { rdd =>
13921397
<div data-lang="java" markdown="1">
13931398
{% highlight java %}
13941399
dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
1395-
@Override public void call(JavaRDD<String> rdd) throws Exception {
1400+
@Override
1401+
public void call(JavaRDD<String> rdd) {
13961402
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
1397-
@Override public void call(Iterator<String> partitionOfRecords) throws Exception {
1398-
Connection connection = ConnectionPool.getConnection(connectionString);
1403+
@Override
1404+
public void call(Iterator<String> partitionOfRecords) {
1405+
// ConnectionPool is a static, lazily initialized pool of connections
1406+
Connection connection = ConnectionPool.getConnection();
13991407
while (partitionOfRecords.hasNext()) {
14001408
connection.send(partitionOfRecords.next());
14011409
}
1402-
ConnectionPool.releaseConnection(connection);
1410+
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
14031411
}
14041412
});
14051413
}

0 commit comments

Comments
 (0)