| #![cfg(feature = "invocation")] |
| |
| use std::{ |
| sync::{ |
| atomic::{AtomicUsize, Ordering}, |
| Arc, Barrier, |
| }, |
| thread::spawn, |
| time::Duration, |
| }; |
| |
| use jni::{objects::AutoLocal, sys::jint, Executor}; |
| |
| use rusty_fork::rusty_fork_test; |
| |
| mod util; |
| use util::{jvm, AtomicIntegerProxy}; |
| |
| #[test] |
| fn single_thread() { |
| let executor = Executor::new(jvm().clone()); |
| test_single_thread(executor); |
| } |
| |
| #[test] |
| fn serialized_threads() { |
| let executor = Executor::new(jvm().clone()); |
| test_serialized_threads(executor); |
| } |
| |
| #[test] |
| fn concurrent_threads() { |
| let executor = Executor::new(jvm().clone()); |
| const THREAD_NUM: usize = 8; |
| test_concurrent_threads(executor, THREAD_NUM) |
| } |
| |
| fn test_single_thread(executor: Executor) { |
| let mut atomic = AtomicIntegerProxy::new(executor, 0).unwrap(); |
| assert_eq!(0, atomic.get().unwrap()); |
| assert_eq!(1, atomic.increment_and_get().unwrap()); |
| assert_eq!(3, atomic.add_and_get(2).unwrap()); |
| assert_eq!(3, atomic.get().unwrap()); |
| } |
| |
| fn test_serialized_threads(executor: Executor) { |
| let mut atomic = AtomicIntegerProxy::new(executor, 0).unwrap(); |
| assert_eq!(0, atomic.get().unwrap()); |
| let jh = spawn(move || { |
| assert_eq!(1, atomic.increment_and_get().unwrap()); |
| assert_eq!(3, atomic.add_and_get(2).unwrap()); |
| atomic |
| }); |
| let mut atomic = jh.join().unwrap(); |
| assert_eq!(3, atomic.get().unwrap()); |
| } |
| |
| fn test_concurrent_threads(executor: Executor, thread_num: usize) { |
| const ITERS_PER_THREAD: usize = 10_000; |
| |
| let mut atomic = AtomicIntegerProxy::new(executor, 0).unwrap(); |
| let barrier = Arc::new(Barrier::new(thread_num)); |
| let mut threads = Vec::new(); |
| |
| for _ in 0..thread_num { |
| let barrier = Arc::clone(&barrier); |
| let mut atomic = atomic.clone(); |
| let jh = spawn(move || { |
| barrier.wait(); |
| for _ in 0..ITERS_PER_THREAD { |
| atomic.increment_and_get().unwrap(); |
| } |
| }); |
| threads.push(jh); |
| } |
| for jh in threads { |
| jh.join().unwrap(); |
| } |
| let expected = (ITERS_PER_THREAD * thread_num) as jint; |
| assert_eq!(expected, atomic.get().unwrap()); |
| } |
| |
| // We need to test `JavaVM::destroy()` in a separate process otherwise it will break |
| // all the other tests |
| rusty_fork_test! { |
| #[test] |
| fn test_destroy() { |
| const THREAD_NUM: usize = 2; |
| const DAEMON_THREAD_NUM: usize = 2; |
| static MATH_CLASS: &str = "java/lang/Math"; |
| |
| // We don't test this using an `Executor` because we don't want to |
| // attach all the threads as daemon threads. |
| |
| let jvm = jvm().clone(); |
| |
| let atomic = Arc::new(AtomicUsize::new(0)); |
| |
| let attach_barrier = Arc::new(Barrier::new(THREAD_NUM + DAEMON_THREAD_NUM + 1)); |
| let daemons_detached_barrier = Arc::new(Barrier::new(DAEMON_THREAD_NUM + 1)); |
| let mut threads = Vec::new(); |
| |
| for _ in 0..THREAD_NUM { |
| let attach_barrier = Arc::clone(&attach_barrier); |
| let jvm = jvm.clone(); |
| let atomic = atomic.clone(); |
| let jh = spawn(move || { |
| let mut env = jvm.attach_current_thread().unwrap(); |
| println!("java thread attach"); |
| attach_barrier.wait(); |
| println!("java thread run"); |
| std::thread::sleep(Duration::from_millis(250)); |
| |
| println!("use before destroy..."); |
| // Make some token JNI call |
| let _class = AutoLocal::new(env.find_class(MATH_CLASS).unwrap(), &env); |
| |
| atomic.fetch_add(1, Ordering::SeqCst); |
| |
| println!("java thread finished"); |
| }); |
| threads.push(jh); |
| } |
| |
| for _ in 0..DAEMON_THREAD_NUM { |
| let attach_barrier = Arc::clone(&attach_barrier); |
| let daemons_detached_barrier = Arc::clone(&daemons_detached_barrier); |
| let jvm = jvm.clone(); |
| let atomic = atomic.clone(); |
| let jh = spawn(move || { |
| // We have to be _very_ careful to ensure we have finished accessing the |
| // JavaVM before it gets destroyed, including dropping the AutoLocal |
| // for the `MATH_CLASS` |
| { |
| let mut env = jvm.attach_current_thread_as_daemon().unwrap(); |
| println!("daemon thread attach"); |
| attach_barrier.wait(); |
| println!("daemon thread run"); |
| |
| println!("daemon JVM use before destroy..."); |
| |
| let _class = AutoLocal::new(env.find_class(MATH_CLASS).unwrap(), &env); |
| } |
| |
| // For it to be safe to call `JavaVM::destroy()` we need to ensure that |
| // daemon threads are detached from the JavaVM ahead of time because |
| // `JavaVM::destroy()` does not synchronize and wait for them to exit |
| // which means we would effectively trigger a use-after-free when daemon |
| // threads exit and they try to automatically detach from the `JavaVM` |
| // |
| // # Safety |
| // We won't be accessing any (invalid) `JNIEnv` once we have detached this |
| // thread |
| unsafe { |
| jvm.detach_current_thread(); |
| } |
| |
| daemons_detached_barrier.wait(); |
| |
| for _ in 0..10 { |
| std::thread::sleep(Duration::from_millis(100)); |
| println!("daemon thread running"); |
| } |
| |
| atomic.fetch_add(1, Ordering::SeqCst); |
| |
| println!("daemon thread finished"); |
| }); |
| threads.push(jh); |
| } |
| |
| // At this point we at least know that all threads have been attached |
| // to the JVM |
| println!("MAIN: waiting for threads attached barrier"); |
| attach_barrier.wait(); |
| |
| // Before we try and destroy the JavaVM we need to be sure that the daemon |
| // threads have finished using the VM since `jvm.destroy()` won't wait |
| // for daemon threads to exit. |
| println!("MAIN: waiting for daemon threads detached barrier"); |
| daemons_detached_barrier.wait(); |
| |
| // # Safety |
| // |
| // We drop the `jvm` variable immediately after `destroy()` returns to avoid |
| // any use-after-free. |
| unsafe { |
| println!("MAIN: calling DestroyJavaVM()..."); |
| jvm.destroy().unwrap(); |
| drop(jvm); |
| println!("MAIN: jvm destroyed"); |
| } |
| |
| println!("MAIN: joining (waiting for) all threads"); |
| let mut joined = 0; |
| for jh in threads { |
| jh.join().unwrap(); |
| joined += 1; |
| println!( |
| "joined {joined} threads, atomic = {}", |
| atomic.load(Ordering::SeqCst) |
| ); |
| } |
| |
| assert_eq!( |
| atomic.load(Ordering::SeqCst), |
| THREAD_NUM + DAEMON_THREAD_NUM |
| ); |
| } |
| |
| } |