@@ -1249,14 +1249,15 @@ dstream.foreachRDD { rdd =>
12491249<div data-lang =" java " markdown =" 1 " >
12501250{% highlight java %}
12511251dstream.foreachRDD(new VoidFunction<JavaRDD<String >>() {
1252- @Override public void call(JavaRDD<String > rdd) throws Exception {
1253- final Connection connection = new Connection(connectionString); // executed at the driver
1252+ @Override
1253+ public void call(JavaRDD<String > rdd) {
1254+ final Connection connection = createNewConnection(); // executed at the driver
12541255 rdd.foreach(new VoidFunction<String >() {
1255- @Override public void call(String record) throws Exception {
1256+ @Override
1257+ public void call(String record) {
12561258 connection.send(record); // executed at the worker
12571259 }
12581260 });
1259- connection.close();
12601261 }
12611262});
12621263{% endhighlight %}
@@ -1297,10 +1298,12 @@ dstream.foreachRDD { rdd =>
12971298<div data-lang =" java " markdown =" 1 " >
12981299{% highlight java %}
12991300dstream.foreachRDD(new VoidFunction<JavaRDD<String >>() {
1300- @Override public void call(JavaRDD<String > rdd) throws Exception {
1301+ @Override
1302+ public void call(JavaRDD<String > rdd) {
13011303 rdd.foreach(new VoidFunction<String >() {
1302- @Override public void call(String record) throws Exception {
1303- Connection connection = new Connection(connectionString);
1304+ @Override
1305+ public void call(String record) {
1306+ Connection connection = createNewConnection();
13041307 connection.send(record);
13051308 connection.close();
13061309 }
@@ -1342,10 +1345,12 @@ dstream.foreachRDD { rdd =>
13421345<div data-lang =" java " markdown =" 1 " >
13431346{% highlight java %}
13441347dstream.foreachRDD(new VoidFunction<JavaRDD<String >>() {
1345- @Override public void call(JavaRDD<String > rdd) throws Exception {
1348+ @Override
1349+ public void call(JavaRDD<String > rdd) {
13461350 rdd.foreachPartition(new VoidFunction<Iterator<String >>() {
1347- @Override public void call(Iterator<String > partitionOfRecords) throws Exception {
1348- Connection connection = new Connection(connectionString);
1351+ @Override
1352+ public void call(Iterator<String > partitionOfRecords) {
1353+ Connection connection = createNewConnection();
13491354 while (partitionOfRecords.hasNext()) {
13501355 connection.send(partitionOfRecords.next());
13511356 }
@@ -1392,14 +1397,17 @@ dstream.foreachRDD { rdd =>
13921397<div data-lang =" java " markdown =" 1 " >
13931398{% highlight java %}
13941399dstream.foreachRDD(new VoidFunction<JavaRDD<String >>() {
1395- @Override public void call(JavaRDD<String > rdd) throws Exception {
1400+ @Override
1401+ public void call(JavaRDD<String > rdd) {
13961402 rdd.foreachPartition(new VoidFunction<Iterator<String >>() {
1397- @Override public void call(Iterator<String > partitionOfRecords) throws Exception {
1398- Connection connection = ConnectionPool.getConnection(connectionString);
1403+ @Override
1404+ public void call(Iterator<String > partitionOfRecords) {
1405+ // ConnectionPool is a static, lazily initialized pool of connections
1406+ Connection connection = ConnectionPool.getConnection();
13991407 while (partitionOfRecords.hasNext()) {
14001408 connection.send(partitionOfRecords.next());
14011409 }
1402- ConnectionPool.releaseConnection (connection);
1410+ ConnectionPool.returnConnection (connection); // return to the pool for future reuse
14031411 }
14041412 });
14051413 }
0 commit comments