blob: daaa632070af26153d6521f6c757ded5f6623873 [file] [log] [blame] [edit]
#![cfg(feature = "invocation")]
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Barrier,
},
thread::spawn,
time::Duration,
};
use jni::{objects::AutoLocal, sys::jint, Executor};
use rusty_fork::rusty_fork_test;
mod util;
use util::{jvm, AtomicIntegerProxy};
#[test]
fn single_thread() {
let executor = Executor::new(jvm().clone());
test_single_thread(executor);
}
#[test]
fn serialized_threads() {
let executor = Executor::new(jvm().clone());
test_serialized_threads(executor);
}
#[test]
fn concurrent_threads() {
let executor = Executor::new(jvm().clone());
const THREAD_NUM: usize = 8;
test_concurrent_threads(executor, THREAD_NUM)
}
fn test_single_thread(executor: Executor) {
let mut atomic = AtomicIntegerProxy::new(executor, 0).unwrap();
assert_eq!(0, atomic.get().unwrap());
assert_eq!(1, atomic.increment_and_get().unwrap());
assert_eq!(3, atomic.add_and_get(2).unwrap());
assert_eq!(3, atomic.get().unwrap());
}
fn test_serialized_threads(executor: Executor) {
let mut atomic = AtomicIntegerProxy::new(executor, 0).unwrap();
assert_eq!(0, atomic.get().unwrap());
let jh = spawn(move || {
assert_eq!(1, atomic.increment_and_get().unwrap());
assert_eq!(3, atomic.add_and_get(2).unwrap());
atomic
});
let mut atomic = jh.join().unwrap();
assert_eq!(3, atomic.get().unwrap());
}
fn test_concurrent_threads(executor: Executor, thread_num: usize) {
const ITERS_PER_THREAD: usize = 10_000;
let mut atomic = AtomicIntegerProxy::new(executor, 0).unwrap();
let barrier = Arc::new(Barrier::new(thread_num));
let mut threads = Vec::new();
for _ in 0..thread_num {
let barrier = Arc::clone(&barrier);
let mut atomic = atomic.clone();
let jh = spawn(move || {
barrier.wait();
for _ in 0..ITERS_PER_THREAD {
atomic.increment_and_get().unwrap();
}
});
threads.push(jh);
}
for jh in threads {
jh.join().unwrap();
}
let expected = (ITERS_PER_THREAD * thread_num) as jint;
assert_eq!(expected, atomic.get().unwrap());
}
// We need to test `JavaVM::destroy()` in a separate process otherwise it will break
// all the other tests
rusty_fork_test! {
#[test]
fn test_destroy() {
const THREAD_NUM: usize = 2;
const DAEMON_THREAD_NUM: usize = 2;
static MATH_CLASS: &str = "java/lang/Math";
// We don't test this using an `Executor` because we don't want to
// attach all the threads as daemon threads.
let jvm = jvm().clone();
let atomic = Arc::new(AtomicUsize::new(0));
let attach_barrier = Arc::new(Barrier::new(THREAD_NUM + DAEMON_THREAD_NUM + 1));
let daemons_detached_barrier = Arc::new(Barrier::new(DAEMON_THREAD_NUM + 1));
let mut threads = Vec::new();
for _ in 0..THREAD_NUM {
let attach_barrier = Arc::clone(&attach_barrier);
let jvm = jvm.clone();
let atomic = atomic.clone();
let jh = spawn(move || {
let mut env = jvm.attach_current_thread().unwrap();
println!("java thread attach");
attach_barrier.wait();
println!("java thread run");
std::thread::sleep(Duration::from_millis(250));
println!("use before destroy...");
// Make some token JNI call
let _class = AutoLocal::new(env.find_class(MATH_CLASS).unwrap(), &env);
atomic.fetch_add(1, Ordering::SeqCst);
println!("java thread finished");
});
threads.push(jh);
}
for _ in 0..DAEMON_THREAD_NUM {
let attach_barrier = Arc::clone(&attach_barrier);
let daemons_detached_barrier = Arc::clone(&daemons_detached_barrier);
let jvm = jvm.clone();
let atomic = atomic.clone();
let jh = spawn(move || {
// We have to be _very_ careful to ensure we have finished accessing the
// JavaVM before it gets destroyed, including dropping the AutoLocal
// for the `MATH_CLASS`
{
let mut env = jvm.attach_current_thread_as_daemon().unwrap();
println!("daemon thread attach");
attach_barrier.wait();
println!("daemon thread run");
println!("daemon JVM use before destroy...");
let _class = AutoLocal::new(env.find_class(MATH_CLASS).unwrap(), &env);
}
// For it to be safe to call `JavaVM::destroy()` we need to ensure that
// daemon threads are detached from the JavaVM ahead of time because
// `JavaVM::destroy()` does not synchronize and wait for them to exit
// which means we would effectively trigger a use-after-free when daemon
// threads exit and they try to automatically detach from the `JavaVM`
//
// # Safety
// We won't be accessing any (invalid) `JNIEnv` once we have detached this
// thread
unsafe {
jvm.detach_current_thread();
}
daemons_detached_barrier.wait();
for _ in 0..10 {
std::thread::sleep(Duration::from_millis(100));
println!("daemon thread running");
}
atomic.fetch_add(1, Ordering::SeqCst);
println!("daemon thread finished");
});
threads.push(jh);
}
// At this point we at least know that all threads have been attached
// to the JVM
println!("MAIN: waiting for threads attached barrier");
attach_barrier.wait();
// Before we try and destroy the JavaVM we need to be sure that the daemon
// threads have finished using the VM since `jvm.destroy()` won't wait
// for daemon threads to exit.
println!("MAIN: waiting for daemon threads detached barrier");
daemons_detached_barrier.wait();
// # Safety
//
// We drop the `jvm` variable immediately after `destroy()` returns to avoid
// any use-after-free.
unsafe {
println!("MAIN: calling DestroyJavaVM()...");
jvm.destroy().unwrap();
drop(jvm);
println!("MAIN: jvm destroyed");
}
println!("MAIN: joining (waiting for) all threads");
let mut joined = 0;
for jh in threads {
jh.join().unwrap();
joined += 1;
println!(
"joined {joined} threads, atomic = {}",
atomic.load(Ordering::SeqCst)
);
}
assert_eq!(
atomic.load(Ordering::SeqCst),
THREAD_NUM + DAEMON_THREAD_NUM
);
}
}