Analyzing multi-gigabyte JSON files locally with serde

2023-03-20

Recently I've read a very interesting article about running queries over large json documents in an almost interactive way. As is often the case, I immediately thought about doing the same in rust (RIIR syndrome?). I was curious how fast (or slow) will be the 'default' way of doing it in rust. This is obviously quite pointless since the approach in the article is nice and easy. Read on only if you want to waste some time on pointless exercise in rust!

Before we start, keep in mind that you can find everything I show here in this repository.

The problem starts with a multi-gigabyte json document where each line is a separate json object. Sadly author can't share that file with us but provides one such object as an example. I've duplicated that object 22214784 times to get ~18G test input.

serde_json::Value

Now that we have some input we can start writing some rust. When I mentioned earlier the default rust way, I was thinking about serde and in our case serde_json. We will use the most general type, which is Value - it can represent arbitrary json values. To read that huge json, we will use mmap and let the OS worry about which pages are needed and which can be discarded. Finally, we will use rayon to use all the cores/threads. This solution will lack a few features from the original post - no easy-to-use query language, and no output (since in our test input, the output will be all 18G of json). But we don't have to worry about block size and can easily run on machines with a little ram. The core of this solution is this snippet:

fn main() -> Result<()> {
    // ...
    let f = File::open(opt.input)?;
    let mmap = unsafe { MmapOptions::new().map(&f)? };
    let bytes: &[u8] = mmap.as_ref();
    let n = bytes
		.lines()
		.par_bridge()
		.flat_map(from_slice::<serde_json::Value>)
		.filter(pred)
		.count()
    ...
}

fn pred(v: &impl Queryable) -> bool {
    v.get_all("subArts")
        .into_iter()
        .flat_map(|v| v.get_all("subSubArts"))
        .flat_map(|v| v.get_all("size"))
        .any(|v| v.contains("snug"))
}

impl Queryable for serde_json::Value {
    fn get_all(&self, key: &'static str) -> Vec<&serde_json::Value> {
        match self {
            serde_json::Value::Object(v) => v.iter().filter(|(k, _)| **k == key).map(|(_, v)| v).collect(),
            serde_json::Value::Array(v) => v.iter().flat_map(|e| e.get_all(key)).collect(),
            _ => vec![],
        }
    }
    fn contains(&self, arg: &str) -> bool {
        match self {
            serde_json::Value::String(v) => v.contains(arg),
            _ => false,
        }
    }
}

Is this already fast enough? The original solution with parallel and jq was able to process similarly sized file on 8 core/16 threads Ryzen cpu in around 30s. This rust approach on 8 core/16 threads i7-1260P can run in around half that time:

Benchmark 2: ./target/release/json-big --input 18_7_G.json --value-type serde
Time (mean ± σ): 16.270 s ± 0.101 s [User: 228.987 s, System: 16.478 s]
Range (min … max): 16.194 s … 16.536 s 10 runs

String interning

Can we make it faster without leaving serde? Yes, we can! We can limit the number of allocations for string keys/values that are present in all those json objects using some sort of string interning. Since we are running on many threads it would be much faster (ask me how I know this...) to use a per-thread cache instead of one global one - this will limit the amount of synchronization. The cache itself will be a HashSet of pointers to strings. To make it simpler we will never deallocate those strings and rely on them being always present:

thread_local! {
    static CACHE: RefCell<Cache> = RefCell::new(Cache::default());
}

#[derive(Debug, Default)]
struct Cache {
    strings: FxHashSet<Box<str>>,
}

impl Cache {
    fn get(&mut self, s: &str) -> &'static str {
        if let Some(ptr) = self.strings.get(s) {
            unsafe { transmute(&**ptr) }
        } else {
            let ptr: Box<str> = s.into();
            let ret: &'static str = unsafe { transmute(&*ptr) };
            self.strings.insert(ptr);
            ret
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct InternedString(&'static str);

impl InternedString {
    pub fn new(s: &str) -> Self {
        Self(CACHE.with(|cache| cache.borrow_mut().get(s)))
    }
}

impl Deref for InternedString {
    type Target = str;

    fn deref(&self) -> &Self::Target {
        &*self.0
    }
}

Having such interned string, we can write our own version of the Value type:

#[derive(Debug, PartialEq, Eq)]
pub enum ValueIntern {
    // Null, // not present in tested input
    // Bool(bool), // not present in tested input
    Number(i64), // int to skip Eq problems
    String(InternedString),
    Array(Vec<ValueIntern>),
    Map(Vec<(ValueIntern, ValueIntern)>),
}

To get it to work with serde we need to implement Deserialize for it:

impl<'de> Deserialize<'de> for ValueIntern {
    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        deserializer.deserialize_any(ValueVisitor {})
    }
}

struct ValueVisitor {}

impl<'de> Visitor<'de> for ValueVisitor {
    type Value = ValueIntern;

    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        write!(formatter, "a json like value")
    }

    fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
    where
        A: MapAccess<'de>,
    {
        let mut out = vec![];
        while let Some((k, v)) = map.next_entry::<ValueIntern, ValueIntern>()? {
            out.push((k, v));
        }
        Ok(ValueIntern::Map(out))
    }

    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
    where
        A: SeqAccess<'de>,
    {
        let mut v = vec![];
        while let Some(e) = seq.next_element()? {
            v.push(e);
        }
        Ok(ValueIntern::Array(v))
    }

    fn visit_borrowed_str<E: serde::de::Error>(
        self,
        v: &'de str,
    ) -> std::result::Result<Self::Value, E> {
        Ok(ValueIntern::String(InternedString::new(v)))
    }

    fn visit_i64<E: serde::de::Error>(self, v: i64) -> std::result::Result<Self::Value, E> {
        Ok(ValueIntern::Number(v))
    }

    fn visit_u64<E: serde::de::Error>(self, v: u64) -> std::result::Result<Self::Value, E> {
        Ok(ValueIntern::Number(v as i64))
    }
}

Nothing fancy, just the simplest possible implementation. How fast is it? It's actually slightly faster than plain serde, we gain 2s:

Benchmark 3: ./target/release/json-big --input 18_7_G.json --value-type intern
Time (mean ± σ): 14.414 s ± 0.047 s [User: 189.484 s, System: 24.711 s]
Range (min … max): 14.345 s … 14.493 s 10 runs

Borrowed strings

Can we still make it faster without abandoning serde? Yes! In fact, we can easily modify interned version to get the speedup. The new version of the Value type now gets a lifetime, and stores non-static string references directly:

pub enum ValueBorrow<'a> {
    Number(i64), // int to skip Eq problems
    String(&'a str),
    Array(Vec<ValueBorrow<'a>>),
    Map(Vec<(ValueBorrow<'a>, ValueBorrow<'a>)>),
}

The only difference in the Deserialize implementation is in the string handling, where we store directly the passed string, instead of first interning it:

impl<'de> Deserialize<'de> for ValueBorrow<'de> {
    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        deserializer.deserialize_any(ValueVisitor {})
    }
}

struct ValueVisitor {}

impl<'de> Visitor<'de> for ValueVisitor {
    type Value = ValueBorrow<'de>;

    // ...

    fn visit_borrowed_str<E: serde::de::Error>(
        self,
        v: &'de str,
    ) -> std::result::Result<Self::Value, E> {
        Ok(ValueBorrow::String(v))
    }

    // ...
}

If we rerun the benchmark, we will see another 3s improvement which brings us down to 11s:

Benchmark 1: ./target/release/json-big --input 18_7_G.json --value-type borrow
Time (mean ± σ): 11.264 s ± 1.088 s [User: 119.407 s, System: 35.480 s]
Range (min … max): 9.935 s … 13.314 s 10 runs

And that's it - no point in wasting more time.