@@ -56,20 +56,6 @@ constexpr int64_t kThreadPoolSize = 8;
5656constexpr int64_t kBytesInMB = 1 << 20 ;
5757static std::vector<std::thread> threadpool_ (kThreadPoolSize );
5858
59- struct ObjectInUseEntry {
60- // / A count of the number of times this client has called PlasmaClient::Create
61- // / or
62- // / PlasmaClient::Get on this object ID minus the number of calls to
63- // / PlasmaClient::Release.
64- // / When this count reaches zero, we remove the entry from the ObjectsInUse
65- // / and decrement a count in the relevant ClientMmapTableEntry.
66- int count;
67- // / Cached information to read the object.
68- PlasmaObject object;
69- // / A flag representing whether the object has been sealed.
70- bool is_sealed;
71- };
72-
7359// If the file descriptor fd has been mmapped in this client process before,
7460// return the pointer that was returned by mmap, otherwise mmap it and store the
7561// pointer in a hash table.
@@ -108,11 +94,11 @@ void increment_object_count(
10894 if (elem == conn->objects_in_use .end ()) {
10995 // Add this object ID to the hash table of object IDs in use. The
11096 // corresponding call to free happens in PlasmaClient::Release.
111- object_entry = new ObjectInUseEntry ();
112- object_entry ->object = *object;
113- object_entry ->count = 0 ;
114- object_entry ->is_sealed = is_sealed;
115- conn->objects_in_use [object_id] = object_entry ;
97+ conn-> objects_in_use [object_id] = std::unique_ptr<ObjectInUseEntry>( new ObjectInUseEntry () );
98+ conn-> objects_in_use [object_id] ->object = *object;
99+ conn-> objects_in_use [object_id] ->count = 0 ;
100+ conn-> objects_in_use [object_id] ->is_sealed = is_sealed;
101+ object_entry = conn->objects_in_use [object_id]. get () ;
116102 // Increment the count of the number of objects in the memory-mapped file
117103 // that are being used. The corresponding decrement should happen in
118104 // PlasmaClient::Release.
@@ -124,7 +110,7 @@ void increment_object_count(
124110 (object_entry->object .data_size + object_entry->object .metadata_size );
125111 entry->second .count += 1 ;
126112 } else {
127- object_entry = elem->second ;
113+ object_entry = elem->second . get () ;
128114 ARROW_CHECK (object_entry->count > 0 );
129115 }
130116 // Increment the count of the number of instances of this object that are
@@ -306,7 +292,6 @@ Status PlasmaClient::PerformRelease(ObjectID object_id) {
306292 object_entry->second ->object .metadata_size );
307293 DCHECK_GE (in_use_object_bytes, 0 );
308294 // Remove the entry from the hash table of objects currently in use.
309- delete object_entry->second ;
310295 objects_in_use.erase (object_id);
311296 }
312297 return Status::OK ();
@@ -497,11 +482,8 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
497482Status PlasmaClient::Disconnect () {
498483 // NOTE: We purposefully do not finish sending release calls for objects in
499484 // use, so that we don't duplicate PlasmaClient::Release calls (when handling
500- // a
501- // SIGTERM, for example).
502- for (auto & entry : objects_in_use) {
503- delete entry.second ;
504- }
485+ // a SIGTERM, for example).
486+
505487 // Close the connections to Plasma. The Plasma store will release the objects
506488 // that were in use by us when handling the SIGPIPE.
507489 close (store_conn);
0 commit comments