Skip to content

Commit ca80e9a

Browse files
committed
remove plain pointer in plasma client, part II
1 parent 627b7c7 commit ca80e9a

File tree

2 files changed

+22
-28
lines changed

2 files changed

+22
-28
lines changed

cpp/src/plasma/client.cc

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,6 @@ constexpr int64_t kThreadPoolSize = 8;
5656
constexpr int64_t kBytesInMB = 1 << 20;
5757
static std::vector<std::thread> threadpool_(kThreadPoolSize);
5858

59-
struct ObjectInUseEntry {
60-
/// A count of the number of times this client has called PlasmaClient::Create
61-
/// or
62-
/// PlasmaClient::Get on this object ID minus the number of calls to
63-
/// PlasmaClient::Release.
64-
/// When this count reaches zero, we remove the entry from the ObjectsInUse
65-
/// and decrement a count in the relevant ClientMmapTableEntry.
66-
int count;
67-
/// Cached information to read the object.
68-
PlasmaObject object;
69-
/// A flag representing whether the object has been sealed.
70-
bool is_sealed;
71-
};
72-
7359
// If the file descriptor fd has been mmapped in this client process before,
7460
// return the pointer that was returned by mmap, otherwise mmap it and store the
7561
// pointer in a hash table.
@@ -108,11 +94,11 @@ void increment_object_count(
10894
if (elem == conn->objects_in_use.end()) {
10995
// Add this object ID to the hash table of object IDs in use. The
11096
// corresponding call to free happens in PlasmaClient::Release.
111-
object_entry = new ObjectInUseEntry();
112-
object_entry->object = *object;
113-
object_entry->count = 0;
114-
object_entry->is_sealed = is_sealed;
115-
conn->objects_in_use[object_id] = object_entry;
97+
conn->objects_in_use[object_id] = std::unique_ptr<ObjectInUseEntry>(new ObjectInUseEntry());
98+
conn->objects_in_use[object_id]->object = *object;
99+
conn->objects_in_use[object_id]->count = 0;
100+
conn->objects_in_use[object_id]->is_sealed = is_sealed;
101+
object_entry = conn->objects_in_use[object_id].get();
116102
// Increment the count of the number of objects in the memory-mapped file
117103
// that are being used. The corresponding decrement should happen in
118104
// PlasmaClient::Release.
@@ -124,7 +110,7 @@ void increment_object_count(
124110
(object_entry->object.data_size + object_entry->object.metadata_size);
125111
entry->second.count += 1;
126112
} else {
127-
object_entry = elem->second;
113+
object_entry = elem->second.get();
128114
ARROW_CHECK(object_entry->count > 0);
129115
}
130116
// Increment the count of the number of instances of this object that are
@@ -306,7 +292,6 @@ Status PlasmaClient::PerformRelease(ObjectID object_id) {
306292
object_entry->second->object.metadata_size);
307293
DCHECK_GE(in_use_object_bytes, 0);
308294
// Remove the entry from the hash table of objects currently in use.
309-
delete object_entry->second;
310295
objects_in_use.erase(object_id);
311296
}
312297
return Status::OK();
@@ -497,11 +482,8 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
497482
Status PlasmaClient::Disconnect() {
498483
// NOTE: We purposefully do not finish sending release calls for objects in
499484
// use, so that we don't duplicate PlasmaClient::Release calls (when handling
500-
// a
501-
// SIGTERM, for example).
502-
for (auto& entry : objects_in_use) {
503-
delete entry.second;
504-
}
485+
// a SIGTERM, for example).
486+
505487
// Close the connections to Plasma. The Plasma store will release the objects
506488
// that were in use by us when handling the SIGPIPE.
507489
close(store_conn);

cpp/src/plasma/client.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,19 @@ struct ClientMmapTableEntry {
6363
int count;
6464
};
6565

66-
struct ObjectInUseEntry;
66+
struct ObjectInUseEntry {
67+
/// A count of the number of times this client has called PlasmaClient::Create
68+
/// or
69+
/// PlasmaClient::Get on this object ID minus the number of calls to
70+
/// PlasmaClient::Release.
71+
/// When this count reaches zero, we remove the entry from the ObjectsInUse
72+
/// and decrement a count in the relevant ClientMmapTableEntry.
73+
int count;
74+
/// Cached information to read the object.
75+
PlasmaObject object;
76+
/// A flag representing whether the object has been sealed.
77+
bool is_sealed;
78+
};
6779

6880
class PlasmaClient {
6981
public:
@@ -290,7 +302,7 @@ class PlasmaClient {
290302
std::unordered_map<int, ClientMmapTableEntry> mmap_table;
291303
/// A hash table of the object IDs that are currently being used by this
292304
/// client.
293-
std::unordered_map<ObjectID, ObjectInUseEntry*, UniqueIDHasher> objects_in_use;
305+
std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>, UniqueIDHasher> objects_in_use;
294306
/// Object IDs of the last few release calls. This is a deque and
295307
/// is used to delay releasing objects to see if they can be reused by
296308
/// subsequent tasks so we do not unneccessarily invalidate cpu caches.

0 commit comments

Comments
 (0)