Skip to content

Commit 6539e1e

Browse files
RED-134847 support for active defrag for RedisJSON. (#1262)
The PR adds support for active defrag on RedisJSON. A pre condition for this PR is that the following PR's will be megred: * [Redis defrad module API extentions](redis/redis#13509) * [redismodule-rs support for active defrag API](RedisLabsModules/redismodule-rs#387) * [IJSON support for defrag](RedisJSON/ijson#1) The PR register defrag function on the json datatype and uses the new capability of ISON to defrag the key. **Notice**: * Increamental defrag of the json key is **not** support. In order to support it we need to implement the free_effort callback. This is not trivial and even if it was it would have cause the json object to potentially be freed when the GIL is not hold, which violate the assumption of our shared string implementation (it is not thread safe). We leave it for future improvment. * If we run on a Redis version that do not support the defrag start callback, we can still partially support defrag. In that case the IJSON object will be defraged but the shared strings dictionatrywill not be reinitialize. This basically means that shared strings will not be defraged. Tests were added to cover the new functionality.
1 parent 1e82622 commit 6539e1e

File tree

7 files changed

+255
-14
lines changed

7 files changed

+255
-14
lines changed

Cargo.lock

+8-10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ members = [
77
]
88

99
[workspace.dependencies]
10-
ijson = { git="https://github.com/RedisJSON/ijson", rev="e0119ac74f6c4ee918718ee122c3948b74ebeba8", default_features=false}
10+
ijson = { git="https://github.com/RedisJSON/ijson", rev="eede48fad51b4ace5043d3e0714f5a65481a065d", default_features=false}
1111
serde_json = { version="1", features = ["unbounded_depth"]}
1212
serde = { version = "1", features = ["derive"] }
1313
serde_derive = "1"

redis_json/Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ ijson.workspace = true
2525
serde_json.workspace = true
2626
serde.workspace = true
2727
libc = "0.2"
28-
redis-module ={ version = "^2.0.7", default-features = false, features = ["min-redis-compatibility-version-7-2"] }
29-
redis-module-macros = "^2.0.7"
28+
redis-module ={ git="https://github.com/RedisLabsModules/redismodule-rs", tag="v2.0.8", default-features = false, features = ["min-redis-compatibility-version-7-2"] }
29+
redis-module-macros = { git="https://github.com/RedisLabsModules/redismodule-rs", tag="v2.0.8" }
3030
itertools = "0.13"
3131
json_path = {path="../json_path"}
3232
linkme = "0.3"
33+
lazy_static = "1"
3334

3435
[features]
3536
as-library = []

redis_json/src/commands.rs

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* the Server Side Public License v1 (SSPLv1).
55
*/
66

7+
use crate::defrag::defrag_info;
78
use crate::error::Error;
89
use crate::formatter::ReplyFormatOptions;
910
use crate::key_value::KeyValue;
@@ -1817,6 +1818,7 @@ pub fn json_debug<M: Manager>(manager: M, ctx: &Context, args: Vec<RedisString>)
18171818
.into())
18181819
}
18191820
}
1821+
"DEFRAG_INFO" => defrag_info(ctx),
18201822
"HELP" => {
18211823
let results = vec![
18221824
"MEMORY <key> [path] - reports memory usage",

redis_json/src/defrag.rs

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
use std::{
2+
alloc::Layout,
3+
os::raw::{c_int, c_void},
4+
};
5+
6+
use ijson::{Defrag, DefragAllocator};
7+
use lazy_static::lazy_static;
8+
use redis_module::{
9+
defrag::DefragContext, raw, redisvalue::RedisValueKey, Context, RedisGILGuard, RedisResult,
10+
RedisValue,
11+
};
12+
use redis_module_macros::{defrag_end_function, defrag_start_function};
13+
14+
use crate::redisjson::RedisJSON;
15+
16+
#[derive(Default)]
17+
pub(crate) struct DefragStats {
18+
defrag_started: usize,
19+
defrag_ended: usize,
20+
keys_defrag: usize,
21+
}
22+
23+
lazy_static! {
24+
pub(crate) static ref DEFRAG_STATS: RedisGILGuard<DefragStats> = RedisGILGuard::default();
25+
}
26+
27+
struct DefragCtxAllocator<'dc> {
28+
defrag_ctx: &'dc DefragContext,
29+
}
30+
31+
impl<'dc> DefragAllocator for DefragCtxAllocator<'dc> {
32+
unsafe fn realloc_ptr<T>(&mut self, ptr: *mut T, _layout: Layout) -> *mut T {
33+
self.defrag_ctx.defrag_realloc(ptr)
34+
}
35+
36+
/// Allocate memory for defrag
37+
unsafe fn alloc(&mut self, layout: Layout) -> *mut u8 {
38+
self.defrag_ctx.defrag_alloc(layout)
39+
}
40+
41+
/// Free memory for defrag
42+
unsafe fn free<T>(&mut self, ptr: *mut T, layout: Layout) {
43+
self.defrag_ctx.defrag_dealloc(ptr, layout)
44+
}
45+
}
46+
47+
#[defrag_start_function]
48+
fn defrag_start(defrag_ctx: &DefragContext) {
49+
let mut defrag_stats = DEFRAG_STATS.lock(defrag_ctx);
50+
defrag_stats.defrag_started += 1;
51+
ijson::reinit_shared_string_cache();
52+
}
53+
54+
#[defrag_end_function]
55+
fn defrag_end(defrag_ctx: &DefragContext) {
56+
let mut defrag_stats = DEFRAG_STATS.lock(defrag_ctx);
57+
defrag_stats.defrag_ended += 1;
58+
}
59+
60+
#[allow(non_snake_case, unused)]
61+
pub unsafe extern "C" fn defrag(
62+
ctx: *mut raw::RedisModuleDefragCtx,
63+
key: *mut raw::RedisModuleString,
64+
value: *mut *mut c_void,
65+
) -> c_int {
66+
let defrag_ctx = DefragContext::new(ctx);
67+
68+
let mut defrag_stats = DEFRAG_STATS.lock(&defrag_ctx);
69+
defrag_stats.keys_defrag += 1;
70+
71+
let mut defrag_allocator = DefragCtxAllocator {
72+
defrag_ctx: &defrag_ctx,
73+
};
74+
let value = value.cast::<*mut RedisJSON<ijson::IValue>>();
75+
let new_val = defrag_allocator.realloc_ptr(*value, Layout::new::<RedisJSON<ijson::IValue>>());
76+
if !new_val.is_null() {
77+
std::ptr::write(value, new_val);
78+
}
79+
std::ptr::write(
80+
&mut (**value).data as *mut ijson::IValue,
81+
std::ptr::read(*value).data.defrag(&mut defrag_allocator),
82+
);
83+
0
84+
}
85+
86+
pub(crate) fn defrag_info(ctx: &Context) -> RedisResult {
87+
let defrag_stats = DEFRAG_STATS.lock(ctx);
88+
Ok(RedisValue::OrderedMap(
89+
[
90+
(
91+
RedisValueKey::String("defrag_started".to_owned()),
92+
RedisValue::Integer(defrag_stats.defrag_started as i64),
93+
),
94+
(
95+
RedisValueKey::String("defrag_ended".to_owned()),
96+
RedisValue::Integer(defrag_stats.defrag_ended as i64),
97+
),
98+
(
99+
RedisValueKey::String("keys_defrag".to_owned()),
100+
RedisValue::Integer(defrag_stats.keys_defrag as i64),
101+
),
102+
]
103+
.into_iter()
104+
.collect(),
105+
))
106+
}

redis_json/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ mod array_index;
3636
mod backward;
3737
pub mod c_api;
3838
pub mod commands;
39+
pub mod defrag;
3940
pub mod error;
4041
mod formatter;
4142
pub mod ivalue_manager;
@@ -73,7 +74,7 @@ pub static REDIS_JSON_TYPE: RedisType = RedisType::new(
7374
free_effort: None,
7475
unlink: None,
7576
copy: Some(redisjson::type_methods::copy),
76-
defrag: None,
77+
defrag: Some(defrag::defrag),
7778

7879
free_effort2: None,
7980
unlink2: None,

tests/pytest/test_defrag.py

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import time
2+
import json
3+
from RLTest import Defaults
4+
5+
Defaults.decode_responses = True
6+
7+
def enableDefrag(env):
8+
# make defrag as aggressive as possible
9+
env.cmd('CONFIG', 'SET', 'hz', '100')
10+
env.cmd('CONFIG', 'SET', 'active-defrag-ignore-bytes', '1')
11+
env.cmd('CONFIG', 'SET', 'active-defrag-threshold-lower', '0')
12+
env.cmd('CONFIG', 'SET', 'active-defrag-cycle-min', '99')
13+
14+
try:
15+
env.cmd('CONFIG', 'SET', 'activedefrag', 'yes')
16+
except Exception:
17+
# If active defrag is not supported by the current Redis, simply skip the test.
18+
env.skip()
19+
20+
def defragOnObj(env, obj):
21+
enableDefrag(env)
22+
json_str = json.dumps(obj)
23+
env.expect('JSON.SET', 'test', '$', json_str).ok()
24+
for i in range(10000):
25+
env.expect('JSON.SET', 'test%d' % i, '$', json_str).ok()
26+
i += 1
27+
env.expect('JSON.SET', 'test%d' % i, '$', json_str).ok()
28+
for i in range(10000):
29+
env.expect('DEL', 'test%d' % i).equal(1)
30+
i += 1
31+
_, _, _, _, _, keysDefrag = env.cmd('JSON.DEBUG', 'DEFRAG_INFO')
32+
startTime = time.time()
33+
# Wait for at least 2 defrag full cycles
34+
# We verify only the 'keysDefrag' value because the other values
35+
# are not promised to be updated. It depends if Redis support
36+
# the start/end defrag callbacks.
37+
while keysDefrag < 2:
38+
time.sleep(0.1)
39+
_, _, _, _, _, keysDefrag = env.cmd('JSON.DEBUG', 'DEFRAG_INFO')
40+
if time.time() - startTime > 30:
41+
# We will wait for up to 30 seconds and then we consider it a failure
42+
env.assertTrue(False, message='Failed waiting for defrag to run')
43+
return
44+
# make sure json is still valid.
45+
res = json.loads(env.cmd('JSON.GET', 'test%d' % i, '$'))[0]
46+
env.assertEqual(res, obj)
47+
env.assertGreater(env.cmd('info', 'Stats')['active_defrag_key_hits'], 0)
48+
49+
def testDefragNumber(env):
50+
defragOnObj(env, 1)
51+
52+
def testDefragBigNumber(env):
53+
defragOnObj(env, 100000000000000000000)
54+
55+
def testDefragDouble(env):
56+
defragOnObj(env, 1.111111111111)
57+
58+
def testDefragNegativeNumber(env):
59+
defragOnObj(env, -100000000000000000000)
60+
61+
def testDefragNegativeDouble(env):
62+
defragOnObj(env, -1.111111111111)
63+
64+
def testDefragTrue(env):
65+
defragOnObj(env, True)
66+
67+
def testDefragFalse(env):
68+
defragOnObj(env, True)
69+
70+
def testDefragNone(env):
71+
defragOnObj(env, None)
72+
73+
def testDefragEmptyString(env):
74+
defragOnObj(env, "")
75+
76+
def testDefragString(env):
77+
defragOnObj(env, "foo")
78+
79+
def testDefragEmptyArray(env):
80+
defragOnObj(env, [])
81+
82+
def testDefragArray(env):
83+
defragOnObj(env, [1, 2, 3])
84+
85+
def testDefragEmptyObject(env):
86+
defragOnObj(env, {})
87+
88+
def testDefragObject(env):
89+
defragOnObj(env, {"foo": "bar"})
90+
91+
def testDefragComplex(env):
92+
defragOnObj(env, {"foo": ["foo", 1, None, True, False, {}, {"foo": [], "bar": 1}]})
93+
94+
def testDefragBigJsons(env):
95+
enableDefrag(env)
96+
97+
# Disable defrag so we can actually create fragmentation
98+
env.cmd('CONFIG', 'SET', 'activedefrag', 'no')
99+
100+
env.expect('JSON.SET', 'key1', '$', "[]").ok()
101+
env.expect('JSON.SET', 'key2', '$', "[]").ok()
102+
103+
for i in range(100000):
104+
env.cmd('JSON.ARRAPPEND', 'key1', '$', "[1.11111111111]")
105+
env.cmd('JSON.ARRAPPEND', 'key2', '$', "[1.11111111111]")
106+
107+
# Now we delete key2 which should cause fragmenation
108+
env.expect('DEL', 'key2').equal(1)
109+
110+
# wait for fragmentation for up to 30 seconds
111+
frag = env.cmd('info', 'memory')['allocator_frag_ratio']
112+
startTime = time.time()
113+
while frag < 1.4:
114+
time.sleep(0.1)
115+
frag = env.cmd('info', 'memory')['allocator_frag_ratio']
116+
if time.time() - startTime > 30:
117+
# We will wait for up to 30 seconds and then we consider it a failure
118+
env.assertTrue(False, message='Failed waiting for fragmentation, current value %s which is expected to be above 1.4.' % frag)
119+
return
120+
121+
#enable active defrag
122+
env.cmd('CONFIG', 'SET', 'activedefrag', 'yes')
123+
124+
# wait for fragmentation for go down for up to 30 seconds
125+
frag = env.cmd('info', 'memory')['allocator_frag_ratio']
126+
startTime = time.time()
127+
while frag > 1.1:
128+
time.sleep(0.1)
129+
frag = env.cmd('info', 'memory')['allocator_frag_ratio']
130+
if time.time() - startTime > 30:
131+
# We will wait for up to 30 seconds and then we consider it a failure
132+
env.assertTrue(False, message='Failed waiting for fragmentation to go down, current value %s which is expected to be bellow 1.1.' % frag)
133+
return

0 commit comments

Comments
 (0)