| //! A language server scaffold, exposing a synchronous crossbeam-channel based API. |
| //! This crate handles protocol handshaking and parsing messages, while you |
| //! control the message dispatch loop yourself. |
| //! |
| //! Run with `RUST_LOG=lsp_server=debug` to see all the messages. |
| |
| #![warn(rust_2018_idioms, unused_lifetimes)] |
| #![allow(clippy::print_stdout, clippy::disallowed_types)] |
| |
| mod error; |
| mod msg; |
| mod req_queue; |
| mod socket; |
| mod stdio; |
| |
| use std::{ |
| io, |
| net::{TcpListener, TcpStream, ToSocketAddrs}, |
| }; |
| |
| use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; |
| |
| pub use crate::{ |
| error::{ExtractError, ProtocolError}, |
| msg::{ErrorCode, Message, Notification, Request, RequestId, Response, ResponseError}, |
| req_queue::{Incoming, Outgoing, ReqQueue}, |
| stdio::IoThreads, |
| }; |
| |
| /// Connection is just a pair of channels of LSP messages. |
| pub struct Connection { |
| pub sender: Sender<Message>, |
| pub receiver: Receiver<Message>, |
| } |
| |
| impl Connection { |
| /// Create connection over standard in/standard out. |
| /// |
| /// Use this to create a real language server. |
| pub fn stdio() -> (Connection, IoThreads) { |
| let (sender, receiver, io_threads) = stdio::stdio_transport(); |
| (Connection { sender, receiver }, io_threads) |
| } |
| |
| /// Open a connection over tcp. |
| /// This call blocks until a connection is established. |
| /// |
| /// Use this to create a real language server. |
| pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> { |
| let stream = TcpStream::connect(addr)?; |
| let (sender, receiver, io_threads) = socket::socket_transport(stream); |
| Ok((Connection { sender, receiver }, io_threads)) |
| } |
| |
| /// Listen for a connection over tcp. |
| /// This call blocks until a connection is established. |
| /// |
| /// Use this to create a real language server. |
| pub fn listen<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> { |
| let listener = TcpListener::bind(addr)?; |
| let (stream, _) = listener.accept()?; |
| let (sender, receiver, io_threads) = socket::socket_transport(stream); |
| Ok((Connection { sender, receiver }, io_threads)) |
| } |
| |
| /// Creates a pair of connected connections. |
| /// |
| /// Use this for testing. |
| pub fn memory() -> (Connection, Connection) { |
| let (s1, r1) = crossbeam_channel::unbounded(); |
| let (s2, r2) = crossbeam_channel::unbounded(); |
| (Connection { sender: s1, receiver: r2 }, Connection { sender: s2, receiver: r1 }) |
| } |
| |
| /// Starts the initialization process by waiting for an initialize |
| /// request from the client. Use this for more advanced customization than |
| /// `initialize` can provide. |
| /// |
| /// Returns the request id and serialized `InitializeParams` from the client. |
| /// |
| /// # Example |
| /// |
| /// ```no_run |
| /// use std::error::Error; |
| /// use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities}; |
| /// |
| /// use lsp_server::{Connection, Message, Request, RequestId, Response}; |
| /// |
| /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> { |
| /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could |
| /// // also be implemented to use sockets or HTTP. |
| /// let (connection, io_threads) = Connection::stdio(); |
| /// |
| /// // Run the server |
| /// let (id, params) = connection.initialize_start()?; |
| /// |
| /// let init_params: InitializeParams = serde_json::from_value(params).unwrap(); |
| /// let client_capabilities: ClientCapabilities = init_params.capabilities; |
| /// let server_capabilities = ServerCapabilities::default(); |
| /// |
| /// let initialize_data = serde_json::json!({ |
| /// "capabilities": server_capabilities, |
| /// "serverInfo": { |
| /// "name": "lsp-server-test", |
| /// "version": "0.1" |
| /// } |
| /// }); |
| /// |
| /// connection.initialize_finish(id, initialize_data)?; |
| /// |
| /// // ... Run main loop ... |
| /// |
| /// Ok(()) |
| /// } |
| /// ``` |
| pub fn initialize_start(&self) -> Result<(RequestId, serde_json::Value), ProtocolError> { |
| self.initialize_start_while(|| true) |
| } |
| |
| /// Starts the initialization process by waiting for an initialize as described in |
| /// [`Self::initialize_start`] as long as `running` returns |
| /// `true` while the return value can be changed through a sig handler such as `CTRL + C`. |
| /// |
| /// # Example |
| /// |
| /// ```rust |
| /// use std::sync::atomic::{AtomicBool, Ordering}; |
| /// use std::sync::Arc; |
| /// # use std::error::Error; |
| /// # use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities}; |
| /// # use lsp_server::{Connection, Message, Request, RequestId, Response}; |
| /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> { |
| /// let running = Arc::new(AtomicBool::new(true)); |
| /// # running.store(true, Ordering::SeqCst); |
| /// let r = running.clone(); |
| /// |
| /// ctrlc::set_handler(move || { |
| /// r.store(false, Ordering::SeqCst); |
| /// }).expect("Error setting Ctrl-C handler"); |
| /// |
| /// let (connection, io_threads) = Connection::stdio(); |
| /// |
| /// let res = connection.initialize_start_while(|| running.load(Ordering::SeqCst)); |
| /// # assert!(res.is_err()); |
| /// |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn initialize_start_while<C>( |
| &self, |
| running: C, |
| ) -> Result<(RequestId, serde_json::Value), ProtocolError> |
| where |
| C: Fn() -> bool, |
| { |
| while running() { |
| let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) { |
| Ok(msg) => msg, |
| Err(RecvTimeoutError::Timeout) => { |
| continue; |
| } |
| Err(RecvTimeoutError::Disconnected) => return Err(ProtocolError::disconnected()), |
| }; |
| |
| match msg { |
| Message::Request(req) if req.is_initialize() => return Ok((req.id, req.params)), |
| // Respond to non-initialize requests with ServerNotInitialized |
| Message::Request(req) => { |
| let resp = Response::new_err( |
| req.id.clone(), |
| ErrorCode::ServerNotInitialized as i32, |
| format!("expected initialize request, got {req:?}"), |
| ); |
| self.sender.send(resp.into()).unwrap(); |
| continue; |
| } |
| Message::Notification(n) if !n.is_exit() => { |
| continue; |
| } |
| msg => { |
| return Err(ProtocolError::new(format!( |
| "expected initialize request, got {msg:?}" |
| ))); |
| } |
| }; |
| } |
| |
| Err(ProtocolError::new(String::from( |
| "Initialization has been aborted during initialization", |
| ))) |
| } |
| |
| /// Finishes the initialization process by sending an `InitializeResult` to the client |
| pub fn initialize_finish( |
| &self, |
| initialize_id: RequestId, |
| initialize_result: serde_json::Value, |
| ) -> Result<(), ProtocolError> { |
| let resp = Response::new_ok(initialize_id, initialize_result); |
| self.sender.send(resp.into()).unwrap(); |
| match &self.receiver.recv() { |
| Ok(Message::Notification(n)) if n.is_initialized() => Ok(()), |
| Ok(msg) => Err(ProtocolError::new(format!( |
| r#"expected initialized notification, got: {msg:?}"# |
| ))), |
| Err(RecvError) => Err(ProtocolError::disconnected()), |
| } |
| } |
| |
| /// Finishes the initialization process as described in [`Self::initialize_finish`] as |
| /// long as `running` returns `true` while the return value can be changed through a sig |
| /// handler such as `CTRL + C`. |
| pub fn initialize_finish_while<C>( |
| &self, |
| initialize_id: RequestId, |
| initialize_result: serde_json::Value, |
| running: C, |
| ) -> Result<(), ProtocolError> |
| where |
| C: Fn() -> bool, |
| { |
| let resp = Response::new_ok(initialize_id, initialize_result); |
| self.sender.send(resp.into()).unwrap(); |
| |
| while running() { |
| let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) { |
| Ok(msg) => msg, |
| Err(RecvTimeoutError::Timeout) => { |
| continue; |
| } |
| Err(RecvTimeoutError::Disconnected) => { |
| return Err(ProtocolError::disconnected()); |
| } |
| }; |
| |
| match msg { |
| Message::Notification(n) if n.is_initialized() => { |
| return Ok(()); |
| } |
| msg => { |
| return Err(ProtocolError::new(format!( |
| r#"expected initialized notification, got: {msg:?}"# |
| ))); |
| } |
| } |
| } |
| |
| Err(ProtocolError::new(String::from( |
| "Initialization has been aborted during initialization", |
| ))) |
| } |
| |
| /// Initialize the connection. Sends the server capabilities |
| /// to the client and returns the serialized client capabilities |
| /// on success. If more fine-grained initialization is required use |
| /// `initialize_start`/`initialize_finish`. |
| /// |
| /// # Example |
| /// |
| /// ```no_run |
| /// use std::error::Error; |
| /// use lsp_types::ServerCapabilities; |
| /// |
| /// use lsp_server::{Connection, Message, Request, RequestId, Response}; |
| /// |
| /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> { |
| /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could |
| /// // also be implemented to use sockets or HTTP. |
| /// let (connection, io_threads) = Connection::stdio(); |
| /// |
| /// // Run the server |
| /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap(); |
| /// let initialization_params = connection.initialize(server_capabilities)?; |
| /// |
| /// // ... Run main loop ... |
| /// |
| /// Ok(()) |
| /// } |
| /// ``` |
| pub fn initialize( |
| &self, |
| server_capabilities: serde_json::Value, |
| ) -> Result<serde_json::Value, ProtocolError> { |
| let (id, params) = self.initialize_start()?; |
| |
| let initialize_data = serde_json::json!({ |
| "capabilities": server_capabilities, |
| }); |
| |
| self.initialize_finish(id, initialize_data)?; |
| |
| Ok(params) |
| } |
| |
| /// Initialize the connection as described in [`Self::initialize`] as long as `running` returns |
| /// `true` while the return value can be changed through a sig handler such as `CTRL + C`. |
| /// |
| /// # Example |
| /// |
| /// ```rust |
| /// use std::sync::atomic::{AtomicBool, Ordering}; |
| /// use std::sync::Arc; |
| /// # use std::error::Error; |
| /// # use lsp_types::ServerCapabilities; |
| /// # use lsp_server::{Connection, Message, Request, RequestId, Response}; |
| /// |
| /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> { |
| /// let running = Arc::new(AtomicBool::new(true)); |
| /// # running.store(true, Ordering::SeqCst); |
| /// let r = running.clone(); |
| /// |
| /// ctrlc::set_handler(move || { |
| /// r.store(false, Ordering::SeqCst); |
| /// }).expect("Error setting Ctrl-C handler"); |
| /// |
| /// let (connection, io_threads) = Connection::stdio(); |
| /// |
| /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap(); |
| /// let initialization_params = connection.initialize_while( |
| /// server_capabilities, |
| /// || running.load(Ordering::SeqCst) |
| /// ); |
| /// |
| /// # assert!(initialization_params.is_err()); |
| /// # Ok(()) |
| /// # } |
| /// ``` |
| pub fn initialize_while<C>( |
| &self, |
| server_capabilities: serde_json::Value, |
| running: C, |
| ) -> Result<serde_json::Value, ProtocolError> |
| where |
| C: Fn() -> bool, |
| { |
| let (id, params) = self.initialize_start_while(&running)?; |
| |
| let initialize_data = serde_json::json!({ |
| "capabilities": server_capabilities, |
| }); |
| |
| self.initialize_finish_while(id, initialize_data, running)?; |
| |
| Ok(params) |
| } |
| |
| /// If `req` is `Shutdown`, respond to it and return `true`, otherwise return `false` |
| pub fn handle_shutdown(&self, req: &Request) -> Result<bool, ProtocolError> { |
| if !req.is_shutdown() { |
| return Ok(false); |
| } |
| let resp = Response::new_ok(req.id.clone(), ()); |
| let _ = self.sender.send(resp.into()); |
| match &self.receiver.recv_timeout(std::time::Duration::from_secs(30)) { |
| Ok(Message::Notification(n)) if n.is_exit() => (), |
| Ok(msg) => { |
| return Err(ProtocolError::new(format!( |
| "unexpected message during shutdown: {msg:?}" |
| ))) |
| } |
| Err(RecvTimeoutError::Timeout) => { |
| return Err(ProtocolError::new( |
| "timed out waiting for exit notification".to_owned(), |
| )) |
| } |
| Err(RecvTimeoutError::Disconnected) => { |
| return Err(ProtocolError::new( |
| "channel disconnected waiting for exit notification".to_owned(), |
| )) |
| } |
| } |
| Ok(true) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use crossbeam_channel::unbounded; |
| use lsp_types::notification::{Exit, Initialized, Notification}; |
| use lsp_types::request::{Initialize, Request}; |
| use lsp_types::{InitializeParams, InitializedParams}; |
| use serde_json::to_value; |
| |
| use crate::{Connection, Message, ProtocolError, RequestId}; |
| |
| struct TestCase { |
| test_messages: Vec<Message>, |
| expected_resp: Result<(RequestId, serde_json::Value), ProtocolError>, |
| } |
| |
| fn initialize_start_test(test_case: TestCase) { |
| let (reader_sender, reader_receiver) = unbounded::<Message>(); |
| let (writer_sender, writer_receiver) = unbounded::<Message>(); |
| let conn = Connection { sender: writer_sender, receiver: reader_receiver }; |
| |
| for msg in test_case.test_messages { |
| assert!(reader_sender.send(msg).is_ok()); |
| } |
| |
| let resp = conn.initialize_start(); |
| assert_eq!(test_case.expected_resp, resp); |
| |
| assert!(writer_receiver.recv_timeout(std::time::Duration::from_secs(1)).is_err()); |
| } |
| |
| #[test] |
| fn not_exit_notification() { |
| let notification = crate::Notification { |
| method: Initialized::METHOD.to_owned(), |
| params: to_value(InitializedParams {}).unwrap(), |
| }; |
| |
| let params_as_value = to_value(InitializeParams::default()).unwrap(); |
| let req_id = RequestId::from(234); |
| let request = crate::Request { |
| id: req_id.clone(), |
| method: Initialize::METHOD.to_owned(), |
| params: params_as_value.clone(), |
| }; |
| |
| initialize_start_test(TestCase { |
| test_messages: vec![notification.into(), request.into()], |
| expected_resp: Ok((req_id, params_as_value)), |
| }); |
| } |
| |
| #[test] |
| fn exit_notification() { |
| let notification = |
| crate::Notification { method: Exit::METHOD.to_owned(), params: to_value(()).unwrap() }; |
| let notification_msg = Message::from(notification); |
| |
| initialize_start_test(TestCase { |
| test_messages: vec![notification_msg.clone()], |
| expected_resp: Err(ProtocolError::new(format!( |
| "expected initialize request, got {notification_msg:?}" |
| ))), |
| }); |
| } |
| } |