| /* |
| * libiio - Library for interfacing industrial I/O (IIO) devices |
| * |
| * Copyright (C) 2014 Analog Devices, Inc. |
| * Author: Paul Cercueil <[email protected]> |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public |
| * License as published by the Free Software Foundation; either |
| * version 2.1 of the License, or (at your option) any later version. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * */ |
| |
| #include "ops.h" |
| #include "parser.h" |
| #include "thread-pool.h" |
| #include "../debug.h" |
| #include "../iio-private.h" |
| |
| #include <errno.h> |
| #include <limits.h> |
| #include <pthread.h> |
| #include <poll.h> |
| #include <stdbool.h> |
| #include <string.h> |
| #include <sys/eventfd.h> |
| #include <time.h> |
| #include <fcntl.h> |
| #include <signal.h> |
| |
| int yyparse(yyscan_t scanner); |
| |
| struct DevEntry; |
| |
| /* Corresponds to a thread reading from a device */ |
| struct ThdEntry { |
| SLIST_ENTRY(ThdEntry) parser_list_entry; |
| SLIST_ENTRY(ThdEntry) dev_list_entry; |
| unsigned int nb, sample_size, samples_count; |
| ssize_t err; |
| |
| int eventfd; |
| |
| struct parser_pdata *pdata; |
| struct iio_device *dev; |
| struct DevEntry *entry; |
| |
| uint32_t *mask; |
| bool active, is_writer, new_client, wait_for_open; |
| }; |
| |
| static void thd_entry_event_signal(struct ThdEntry *thd) |
| { |
| uint64_t e = 1; |
| int ret; |
| |
| do { |
| ret = write(thd->eventfd, &e, sizeof(e)); |
| } while (ret == -1 && errno == EINTR); |
| } |
| |
| static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex, |
| int fd_in) |
| { |
| struct pollfd pfd[3]; |
| uint64_t e; |
| int ret; |
| |
| pthread_mutex_unlock(mutex); |
| |
| pfd[0].fd = thd->eventfd; |
| pfd[0].events = POLLIN; |
| pfd[1].fd = fd_in; |
| pfd[1].events = POLLRDHUP; |
| pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool); |
| pfd[2].events = POLLIN; |
| |
| do { |
| poll_nointr(pfd, 3); |
| |
| if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) { |
| pthread_mutex_lock(mutex); |
| return -EPIPE; |
| } |
| |
| do { |
| ret = read(thd->eventfd, &e, sizeof(e)); |
| } while (ret == -1 && errno == EINTR); |
| } while (ret == -1 && errno == EAGAIN); |
| |
| pthread_mutex_lock(mutex); |
| |
| return 0; |
| } |
| |
| /* Corresponds to an opened device */ |
| struct DevEntry { |
| unsigned int ref_count; |
| |
| struct iio_device *dev; |
| struct iio_buffer *buf; |
| unsigned int sample_size, nb_clients; |
| bool update_mask; |
| bool cyclic; |
| bool closed; |
| bool cancelled; |
| |
| /* Linked list of ThdEntry structures corresponding |
| * to all the threads who opened the device */ |
| SLIST_HEAD(ThdHead, ThdEntry) thdlist_head; |
| pthread_mutex_t thdlist_lock; |
| |
| pthread_cond_t rw_ready_cond; |
| |
| uint32_t *mask; |
| size_t nb_words; |
| }; |
| |
| struct sample_cb_info { |
| struct parser_pdata *pdata; |
| unsigned int nb_bytes, cpt; |
| uint32_t *mask; |
| }; |
| |
| /* Protects iio_device_{set,get}_data() from concurrent access from multiple |
| * clients */ |
| static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER; |
| |
| #if WITH_AIO |
| static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len, |
| bool do_read) |
| { |
| ssize_t ret; |
| struct pollfd pfd[2]; |
| unsigned int num_pfds; |
| struct iocb iocb; |
| struct iocb *ios[1]; |
| struct io_event e[1]; |
| |
| ios[0] = &iocb; |
| |
| if (do_read) |
| io_prep_pread(&iocb, pdata->fd_in, buf, len, 0); |
| else |
| io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0); |
| |
| io_set_eventfd(&iocb, pdata->aio_eventfd); |
| |
| pthread_mutex_lock(&pdata->aio_mutex); |
| |
| ret = io_submit(pdata->aio_ctx, 1, ios); |
| if (ret != 1) { |
| pthread_mutex_unlock(&pdata->aio_mutex); |
| ERROR("Failed to submit IO operation: %zd\n", ret); |
| return -EIO; |
| } |
| |
| pfd[0].fd = pdata->aio_eventfd; |
| pfd[0].events = POLLIN; |
| pfd[0].revents = 0; |
| pfd[1].fd = thread_pool_get_poll_fd(pdata->pool); |
| pfd[1].events = POLLIN; |
| pfd[1].revents = 0; |
| num_pfds = 2; |
| |
| do { |
| poll_nointr(pfd, num_pfds); |
| |
| if (pfd[0].revents & POLLIN) { |
| uint64_t event; |
| ret = read(pdata->aio_eventfd, &event, sizeof(event)); |
| if (ret != sizeof(event)) { |
| ERROR("Failed to read from eventfd: %d\n", -errno); |
| ret = -EIO; |
| break; |
| } |
| |
| ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL); |
| if (ret != 1) { |
| ERROR("Failed to read IO events: %zd\n", ret); |
| ret = -EIO; |
| break; |
| } else { |
| ret = (long)e[0].res; |
| } |
| } else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) { |
| /* Got a STOP event to abort this whole session */ |
| ret = io_cancel(pdata->aio_ctx, &iocb, e); |
| if (ret != -EINPROGRESS && ret != -EINVAL) { |
| ERROR("Failed to cancel IO transfer: %zd\n", ret); |
| ret = -EIO; |
| break; |
| } |
| /* It should not be long now until we get the cancellation event */ |
| num_pfds = 1; |
| } |
| } while (!(pfd[0].revents & POLLIN)); |
| |
| pthread_mutex_unlock(&pdata->aio_mutex); |
| |
| /* Got STOP event, treat it as EOF */ |
| if (num_pfds == 1) |
| return 0; |
| |
| return ret; |
| } |
| |
| #define MAX_AIO_REQ_SIZE (1024 * 1024) |
| |
| static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len) |
| { |
| if (len > MAX_AIO_REQ_SIZE) |
| len = MAX_AIO_REQ_SIZE; |
| return async_io(pdata, dest, len, true); |
| } |
| |
| static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest, |
| size_t len) |
| { |
| if (len > MAX_AIO_REQ_SIZE) |
| len = MAX_AIO_REQ_SIZE; |
| return async_io(pdata, (void *)dest, len, false); |
| } |
| #endif /* WITH_AIO */ |
| |
| static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len) |
| { |
| ssize_t ret; |
| struct pollfd pfd[2]; |
| |
| pfd[0].fd = pdata->fd_in; |
| pfd[0].events = POLLIN | POLLRDHUP; |
| pfd[0].revents = 0; |
| pfd[1].fd = thread_pool_get_poll_fd(pdata->pool); |
| pfd[1].events = POLLIN; |
| pfd[1].revents = 0; |
| |
| do { |
| poll_nointr(pfd, 2); |
| |
| /* Got STOP event, or client closed the socket: treat it as EOF */ |
| if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP) |
| return 0; |
| if (pfd[0].revents & POLLERR) |
| return -EIO; |
| if (!(pfd[0].revents & POLLIN)) |
| continue; |
| |
| do { |
| if (pdata->fd_in_is_socket) |
| ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL); |
| else |
| ret = read(pdata->fd_in, dest, len); |
| } while (ret == -1 && errno == EINTR); |
| |
| if (ret != -1 || errno != EAGAIN) |
| break; |
| } while (true); |
| |
| if (ret == -1) |
| return -errno; |
| |
| return ret; |
| } |
| |
| static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len) |
| { |
| ssize_t ret; |
| struct pollfd pfd[2]; |
| |
| pfd[0].fd = pdata->fd_out; |
| pfd[0].events = POLLOUT; |
| pfd[0].revents = 0; |
| pfd[1].fd = thread_pool_get_poll_fd(pdata->pool); |
| pfd[1].events = POLLIN; |
| pfd[1].revents = 0; |
| |
| do { |
| poll_nointr(pfd, 2); |
| |
| /* Got STOP event, or client closed the socket: treat it as EOF */ |
| if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP) |
| return 0; |
| if (pfd[0].revents & POLLERR) |
| return -EIO; |
| if (!(pfd[0].revents & POLLOUT)) |
| continue; |
| |
| do { |
| if (pdata->fd_out_is_socket) |
| ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL); |
| else |
| ret = write(pdata->fd_out, src, len); |
| } while (ret == -1 && errno == EINTR); |
| |
| if (ret != -1 || errno != EAGAIN) |
| break; |
| } while (true); |
| |
| if (ret == -1) |
| return -errno; |
| |
| return ret; |
| } |
| |
| ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len) |
| { |
| uintptr_t ptr = (uintptr_t) src; |
| |
| while (len) { |
| ssize_t ret = pdata->writefd(pdata, (void *) ptr, len); |
| if (ret < 0) |
| return ret; |
| if (!ret) |
| return -EPIPE; |
| ptr += ret; |
| len -= ret; |
| } |
| |
| return ptr - (uintptr_t) src; |
| } |
| |
| static ssize_t read_all(struct parser_pdata *pdata, |
| void *dst, size_t len) |
| { |
| uintptr_t ptr = (uintptr_t) dst; |
| |
| while (len) { |
| ssize_t ret = pdata->readfd(pdata, (void *) ptr, len); |
| if (ret < 0) |
| return ret; |
| if (!ret) |
| return -EPIPE; |
| ptr += ret; |
| len -= ret; |
| } |
| |
| return ptr - (uintptr_t) dst; |
| } |
| |
| static void print_value(struct parser_pdata *pdata, long value) |
| { |
| if (pdata->verbose && value < 0) { |
| char buf[1024]; |
| iio_strerror(-value, buf, sizeof(buf)); |
| output(pdata, "ERROR: "); |
| output(pdata, buf); |
| output(pdata, "\n"); |
| } else { |
| char buf[128]; |
| sprintf(buf, "%li\n", value); |
| output(pdata, buf); |
| } |
| } |
| |
| static ssize_t send_sample(const struct iio_channel *chn, |
| void *src, size_t length, void *d) |
| { |
| struct sample_cb_info *info = d; |
| if (chn->index < 0 || !TEST_BIT(info->mask, chn->number)) |
| return 0; |
| if (info->nb_bytes < length) |
| return 0; |
| |
| if (info->cpt % length) { |
| unsigned int i, goal = length - info->cpt % length; |
| char zero = 0; |
| ssize_t ret; |
| |
| for (i = 0; i < goal; i++) { |
| ret = info->pdata->writefd(info->pdata, &zero, 1); |
| if (ret < 0) |
| return ret; |
| } |
| info->cpt += goal; |
| } |
| |
| info->cpt += length; |
| info->nb_bytes -= length; |
| return write_all(info->pdata, src, length); |
| } |
| |
| static ssize_t receive_sample(const struct iio_channel *chn, |
| void *dst, size_t length, void *d) |
| { |
| struct sample_cb_info *info = d; |
| if (chn->index < 0 || !TEST_BIT(info->mask, chn->number)) |
| return 0; |
| if (info->cpt == info->nb_bytes) |
| return 0; |
| |
| /* Skip the padding if needed */ |
| if (info->cpt % length) { |
| unsigned int i, goal = length - info->cpt % length; |
| char foo; |
| ssize_t ret; |
| |
| for (i = 0; i < goal; i++) { |
| ret = info->pdata->readfd(info->pdata, &foo, 1); |
| if (ret < 0) |
| return ret; |
| } |
| info->cpt += goal; |
| } |
| |
| info->cpt += length; |
| return read_all(info->pdata, dst, length); |
| } |
| |
| static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len) |
| { |
| struct parser_pdata *pdata = thd->pdata; |
| bool demux = server_demux && dev->sample_size != thd->sample_size; |
| |
| if (demux) |
| len = (len / dev->sample_size) * thd->sample_size; |
| if (len > thd->nb) |
| len = thd->nb; |
| |
| print_value(pdata, len); |
| |
| if (thd->new_client) { |
| unsigned int i; |
| char buf[129], *ptr = buf; |
| uint32_t *mask = demux ? thd->mask : dev->mask; |
| ssize_t ret; |
| |
| /* Send the current mask */ |
| for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf); |
| i--, ptr += 8) |
| sprintf(ptr, "%08x", mask[i - 1]); |
| |
| *ptr = '\n'; |
| ret = write_all(pdata, buf, ptr + 1 - buf); |
| if (ret < 0) |
| return ret; |
| |
| thd->new_client = false; |
| } |
| |
| if (!demux) { |
| /* Short path */ |
| return write_all(pdata, dev->buf->buffer, len); |
| } else { |
| struct sample_cb_info info = { |
| .pdata = pdata, |
| .cpt = 0, |
| .nb_bytes = len, |
| .mask = thd->mask, |
| }; |
| |
| return iio_buffer_foreach_sample(dev->buf, send_sample, &info); |
| } |
| } |
| |
| static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd) |
| { |
| struct parser_pdata *pdata = thd->pdata; |
| |
| /* Inform that no error occured, and that we'll start reading data */ |
| if (thd->new_client) { |
| print_value(thd->pdata, 0); |
| thd->new_client = false; |
| } |
| |
| if (dev->sample_size == thd->sample_size) { |
| /* Short path: Receive directly in the buffer */ |
| |
| size_t len = dev->buf->length; |
| if (thd->nb < len) |
| len = thd->nb; |
| |
| return read_all(pdata, dev->buf->buffer, len); |
| } else { |
| /* Long path: Mux the samples to the buffer */ |
| |
| struct sample_cb_info info = { |
| .pdata = pdata, |
| .cpt = 0, |
| .nb_bytes = thd->nb, |
| .mask = thd->mask, |
| }; |
| |
| return iio_buffer_foreach_sample(dev->buf, |
| receive_sample, &info); |
| } |
| } |
| |
| static void dev_entry_put(struct DevEntry *entry) |
| { |
| bool free_entry = false; |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| entry->ref_count--; |
| if (entry->ref_count == 0) |
| free_entry = true; |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| |
| if (free_entry) { |
| pthread_mutex_destroy(&entry->thdlist_lock); |
| pthread_cond_destroy(&entry->rw_ready_cond); |
| |
| free(entry->mask); |
| free(entry); |
| } |
| } |
| |
| static void signal_thread(struct ThdEntry *thd, ssize_t ret) |
| { |
| thd->err = ret; |
| thd->nb = 0; |
| thd->active = false; |
| thd_entry_event_signal(thd); |
| } |
| |
| static void rw_thd(struct thread_pool *pool, void *d) |
| { |
| struct DevEntry *entry = d; |
| struct ThdEntry *thd, *next_thd; |
| struct iio_device *dev = entry->dev; |
| unsigned int nb_words = entry->nb_words; |
| ssize_t ret = 0; |
| |
| DEBUG("R/W thread started for device %s\n", |
| dev->name ? dev->name : dev->id); |
| |
| while (true) { |
| bool has_readers = false, has_writers = false, |
| mask_updated = false; |
| unsigned int sample_size; |
| |
| /* NOTE: this while loop must exit with thdlist_lock locked. */ |
| pthread_mutex_lock(&entry->thdlist_lock); |
| |
| if (SLIST_EMPTY(&entry->thdlist_head)) |
| break; |
| |
| if (entry->update_mask) { |
| unsigned int i; |
| unsigned int samples_count = 0; |
| |
| memset(entry->mask, 0, nb_words * sizeof(*entry->mask)); |
| SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) { |
| for (i = 0; i < nb_words; i++) |
| entry->mask[i] |= thd->mask[i]; |
| |
| if (thd->samples_count > samples_count) |
| samples_count = thd->samples_count; |
| } |
| |
| if (entry->buf) |
| iio_buffer_destroy(entry->buf); |
| |
| for (i = 0; i < dev->nb_channels; i++) { |
| struct iio_channel *chn = dev->channels[i]; |
| long index = chn->index; |
| |
| if (index < 0) |
| continue; |
| |
| if (TEST_BIT(entry->mask, chn->number)) |
| iio_channel_enable(chn); |
| else |
| iio_channel_disable(chn); |
| } |
| |
| entry->buf = iio_device_create_buffer(dev, |
| samples_count, entry->cyclic); |
| if (!entry->buf) { |
| ret = -errno; |
| ERROR("Unable to create buffer\n"); |
| break; |
| } |
| entry->cancelled = false; |
| |
| /* Signal the threads that we opened the device */ |
| SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) { |
| if (thd->wait_for_open) { |
| thd->wait_for_open = false; |
| signal_thread(thd, 0); |
| } |
| } |
| |
| DEBUG("IIO device %s reopened with new mask:\n", |
| dev->id); |
| for (i = 0; i < nb_words; i++) |
| DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]); |
| entry->update_mask = false; |
| |
| entry->sample_size = iio_device_get_sample_size(dev); |
| mask_updated = true; |
| } |
| |
| sample_size = entry->sample_size; |
| |
| SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) { |
| thd->active = !thd->err && thd->nb >= sample_size; |
| if (mask_updated && thd->active) |
| signal_thread(thd, thd->nb); |
| |
| if (thd->is_writer) |
| has_writers |= thd->active; |
| else |
| has_readers |= thd->active; |
| } |
| |
| if (!has_readers && !has_writers) { |
| pthread_cond_wait(&entry->rw_ready_cond, |
| &entry->thdlist_lock); |
| } |
| |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| |
| if (!has_readers && !has_writers) |
| continue; |
| |
| if (has_readers) { |
| ssize_t nb_bytes; |
| |
| ret = iio_buffer_refill(entry->buf); |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| |
| /* |
| * When the last client disconnects the buffer is |
| * cancelled and iio_buffer_refill() returns an error. A |
| * new client might have connected before we got here |
| * though, in that case the rw thread has to stay active |
| * and a new buffer is created. If the list is still empty the loop |
| * will exit normally. |
| */ |
| if (entry->cancelled) { |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| continue; |
| } |
| |
| if (ret < 0) { |
| ERROR("Reading from device failed: %i\n", |
| (int) ret); |
| break; |
| } |
| |
| nb_bytes = ret; |
| |
| /* We don't use SLIST_FOREACH here. As soon as a thread is |
| * signaled, its "thd" structure might be freed; |
| * SLIST_FOREACH would then cause a segmentation fault, as it |
| * reads "thd" to get the address of the next element. */ |
| for (thd = SLIST_FIRST(&entry->thdlist_head); |
| thd; thd = next_thd) { |
| next_thd = SLIST_NEXT(thd, dev_list_entry); |
| |
| if (!thd->active || thd->is_writer) |
| continue; |
| |
| ret = send_data(entry, thd, nb_bytes); |
| if (ret > 0) |
| thd->nb -= ret; |
| |
| if (ret < 0 || thd->nb < sample_size) |
| signal_thread(thd, (ret < 0) ? |
| ret : thd->nb); |
| } |
| |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| } |
| |
| if (has_writers) { |
| ssize_t nb_bytes = 0; |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| |
| /* Reset the size of the buffer to its maximum size */ |
| entry->buf->data_length = entry->buf->length; |
| |
| /* Same comment as above */ |
| for (thd = SLIST_FIRST(&entry->thdlist_head); |
| thd; thd = next_thd) { |
| next_thd = SLIST_NEXT(thd, dev_list_entry); |
| |
| if (!thd->active || !thd->is_writer) |
| continue; |
| |
| ret = receive_data(entry, thd); |
| if (ret > 0) { |
| thd->nb -= ret; |
| if (ret > nb_bytes) |
| nb_bytes = ret; |
| } |
| |
| if (ret < 0) |
| signal_thread(thd, ret); |
| } |
| |
| ret = iio_buffer_push_partial(entry->buf, |
| nb_bytes / sample_size); |
| if (entry->cancelled) { |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| continue; |
| } |
| if (ret < 0) { |
| ERROR("Writing to device failed: %i\n", |
| (int) ret); |
| break; |
| } |
| |
| /* Signal threads which completed their RW command */ |
| for (thd = SLIST_FIRST(&entry->thdlist_head); |
| thd; thd = next_thd) { |
| next_thd = SLIST_NEXT(thd, dev_list_entry); |
| if (thd->active && thd->is_writer && |
| thd->nb < sample_size) |
| signal_thread(thd, thd->nb); |
| } |
| |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| } |
| } |
| |
| /* Signal all remaining threads */ |
| for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) { |
| next_thd = SLIST_NEXT(thd, dev_list_entry); |
| SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry); |
| thd->wait_for_open = false; |
| signal_thread(thd, ret); |
| } |
| if (entry->buf) { |
| iio_buffer_destroy(entry->buf); |
| entry->buf = NULL; |
| } |
| entry->closed = true; |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| |
| pthread_mutex_lock(&devlist_lock); |
| /* It is possible that a new thread has already started, make sure to |
| * not overwrite it. */ |
| if (iio_device_get_data(dev) == entry) |
| iio_device_set_data(dev, NULL); |
| pthread_mutex_unlock(&devlist_lock); |
| |
| DEBUG("Stopping R/W thread for device %s\n", |
| dev->name ? dev->name : dev->id); |
| |
| dev_entry_put(entry); |
| } |
| |
| static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata, |
| struct iio_device *dev) |
| { |
| struct ThdEntry *t; |
| |
| SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) { |
| if (t->dev == dev) |
| return t; |
| } |
| |
| return NULL; |
| } |
| |
| static ssize_t rw_buffer(struct parser_pdata *pdata, |
| struct iio_device *dev, unsigned int nb, bool is_write) |
| { |
| struct DevEntry *entry; |
| struct ThdEntry *thd; |
| ssize_t ret; |
| |
| if (!dev) |
| return -ENODEV; |
| |
| thd = parser_lookup_thd_entry(pdata, dev); |
| if (!thd) |
| return -EBADF; |
| |
| entry = thd->entry; |
| |
| if (nb < entry->sample_size) |
| return 0; |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| if (entry->closed) { |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| return -EBADF; |
| } |
| |
| if (thd->nb) { |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| return -EBUSY; |
| } |
| |
| thd->new_client = true; |
| thd->nb = nb; |
| thd->err = 0; |
| thd->is_writer = is_write; |
| thd->active = true; |
| |
| pthread_cond_signal(&entry->rw_ready_cond); |
| |
| DEBUG("Waiting for completion...\n"); |
| while (thd->active) { |
| ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in); |
| if (ret) |
| break; |
| } |
| if (ret == 0) |
| ret = thd->err; |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| |
| if (ret > 0 && ret < nb) |
| print_value(thd->pdata, 0); |
| |
| DEBUG("Exiting rw_buffer with code %li\n", (long) ret); |
| if (ret < 0) |
| return ret; |
| else |
| return nb - ret; |
| } |
| |
| static uint32_t *get_mask(const char *mask, size_t *len) |
| { |
| size_t nb = (*len + 7) / 8; |
| uint32_t *ptr, *words = calloc(nb, sizeof(*words)); |
| if (!words) |
| return NULL; |
| |
| ptr = words + nb; |
| while (*mask) { |
| char buf[9]; |
| sprintf(buf, "%.*s", 8, mask); |
| sscanf(buf, "%08x", --ptr); |
| mask += 8; |
| DEBUG("Mask[%lu]: 0x%08x\n", |
| (unsigned long) (words - ptr) / 4, *ptr); |
| } |
| |
| *len = nb; |
| return words; |
| } |
| |
| static void free_thd_entry(struct ThdEntry *t) |
| { |
| close(t->eventfd); |
| free(t->mask); |
| free(t); |
| } |
| |
| static void remove_thd_entry(struct ThdEntry *t) |
| { |
| struct DevEntry *entry = t->entry; |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| if (!entry->closed) { |
| entry->update_mask = true; |
| SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry); |
| if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) { |
| entry->cancelled = true; |
| iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */ |
| } |
| |
| pthread_cond_signal(&entry->rw_ready_cond); |
| } |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| dev_entry_put(entry); |
| |
| free_thd_entry(t); |
| } |
| |
| static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev, |
| size_t samples_count, const char *mask, bool cyclic) |
| { |
| int ret = -ENOMEM; |
| struct DevEntry *entry; |
| struct ThdEntry *thd; |
| size_t len = strlen(mask); |
| uint32_t *words; |
| unsigned int nb_channels; |
| unsigned int cyclic_retry = 500; |
| |
| if (!dev) |
| return -ENODEV; |
| |
| nb_channels = dev->nb_channels; |
| if (len != ((nb_channels + 31) / 32) * 8) |
| return -EINVAL; |
| |
| words = get_mask(mask, &len); |
| if (!words) |
| return -ENOMEM; |
| |
| thd = zalloc(sizeof(*thd)); |
| if (!thd) |
| goto err_free_words; |
| |
| thd->wait_for_open = true; |
| thd->mask = words; |
| thd->nb = 0; |
| thd->samples_count = samples_count; |
| thd->sample_size = iio_device_get_sample_size_mask(dev, words, len); |
| thd->pdata = pdata; |
| thd->dev = dev; |
| thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); |
| |
| retry: |
| /* Atomically look up the thread and make sure that it is still active |
| * or allocate new one. */ |
| pthread_mutex_lock(&devlist_lock); |
| entry = iio_device_get_data(dev); |
| if (entry) { |
| if (cyclic || entry->cyclic) { |
| /* Only one client allowed in cyclic mode */ |
| pthread_mutex_unlock(&devlist_lock); |
| |
| /* There is an inherent race condition if a client |
| * creates a new cyclic buffer shortly after destroying |
| * a previous. E.g. like |
| * |
| * iio_buffer_destroy(buf); |
| * buf = iio_device_create_buffer(dev, n, true); |
| * |
| * In this case the two buffers each use their own |
| * communication channel which are unordered to each |
| * other. E.g. the socket open might arrive before the |
| * socket close on the host side, even though they were |
| * sent in the opposite order on the client side. This |
| * race condition can cause an error being reported back |
| * to the client, even though the code on the client |
| * side was well formed and would work fine e.g. using |
| * the local backend. |
| * |
| * To avoid this issue go to sleep for up to 50ms in |
| * intervals of 100us. This should be enough time for |
| * the issue to resolve itself. If there actually is |
| * contention on the buffer an error will eventually be |
| * returned in which case the additional delay cause by |
| * the retires should not matter too much. |
| * |
| * This is not pretty but it works. |
| */ |
| if (cyclic_retry) { |
| cyclic_retry--; |
| usleep(100); |
| goto retry; |
| } |
| |
| ret = -EBUSY; |
| goto err_free_thd; |
| } |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| if (!entry->closed) { |
| pthread_mutex_unlock(&devlist_lock); |
| |
| entry->ref_count++; |
| |
| SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry); |
| thd->entry = entry; |
| entry->update_mask = true; |
| DEBUG("Added thread to client list\n"); |
| |
| pthread_cond_signal(&entry->rw_ready_cond); |
| |
| /* Wait until the device is opened by the rw thread */ |
| while (thd->wait_for_open) { |
| ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in); |
| if (ret) |
| break; |
| } |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| |
| if (ret == 0) |
| ret = (int) thd->err; |
| if (ret < 0) |
| remove_thd_entry(thd); |
| else |
| SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry); |
| return ret; |
| } else { |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| } |
| } |
| |
| entry = zalloc(sizeof(*entry)); |
| if (!entry) { |
| pthread_mutex_unlock(&devlist_lock); |
| goto err_free_thd; |
| } |
| |
| entry->ref_count = 2; /* One for thread, one for the client */ |
| |
| entry->mask = malloc(len * sizeof(*words)); |
| if (!entry->mask) { |
| pthread_mutex_unlock(&devlist_lock); |
| goto err_free_entry; |
| } |
| |
| entry->cyclic = cyclic; |
| entry->nb_words = len; |
| entry->update_mask = true; |
| entry->dev = dev; |
| entry->buf = NULL; |
| SLIST_INIT(&entry->thdlist_head); |
| SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry); |
| thd->entry = entry; |
| DEBUG("Added thread to client list\n"); |
| |
| pthread_mutex_init(&entry->thdlist_lock, NULL); |
| pthread_cond_init(&entry->rw_ready_cond, NULL); |
| |
| ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd"); |
| if (ret) { |
| pthread_mutex_unlock(&devlist_lock); |
| goto err_free_entry_mask; |
| } |
| |
| DEBUG("Adding new device thread to device list\n"); |
| iio_device_set_data(dev, entry); |
| pthread_mutex_unlock(&devlist_lock); |
| |
| pthread_mutex_lock(&entry->thdlist_lock); |
| /* Wait until the device is opened by the rw thread */ |
| while (thd->wait_for_open) { |
| ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in); |
| if (ret) |
| break; |
| } |
| pthread_mutex_unlock(&entry->thdlist_lock); |
| |
| if (ret == 0) |
| ret = (int) thd->err; |
| if (ret < 0) |
| remove_thd_entry(thd); |
| else |
| SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry); |
| return ret; |
| |
| err_free_entry_mask: |
| free(entry->mask); |
| err_free_entry: |
| free(entry); |
| err_free_thd: |
| close(thd->eventfd); |
| free(thd); |
| err_free_words: |
| free(words); |
| return ret; |
| } |
| |
| static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev) |
| { |
| struct ThdEntry *t; |
| |
| if (!dev) |
| return -ENODEV; |
| |
| t = parser_lookup_thd_entry(pdata, dev); |
| if (!t) |
| return -ENXIO; |
| |
| SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry); |
| remove_thd_entry(t); |
| |
| return 0; |
| } |
| |
| int open_dev(struct parser_pdata *pdata, struct iio_device *dev, |
| size_t samples_count, const char *mask, bool cyclic) |
| { |
| int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic); |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| int close_dev(struct parser_pdata *pdata, struct iio_device *dev) |
| { |
| int ret = close_dev_helper(pdata, dev); |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev, |
| unsigned int nb, bool is_write) |
| { |
| ssize_t ret = rw_buffer(pdata, dev, nb, is_write); |
| if (ret <= 0 || is_write) |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev, |
| const char *attr, enum iio_attr_type type) |
| { |
| /* We use a very large buffer here, as if attr is NULL all the |
| * attributes will be read, which may represents a few kilobytes worth |
| * of data. */ |
| char buf[0x10000]; |
| ssize_t ret = -EINVAL; |
| |
| if (!dev) { |
| print_value(pdata, -ENODEV); |
| return -ENODEV; |
| } |
| |
| switch (type) { |
| case IIO_ATTR_TYPE_DEVICE: |
| ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1); |
| break; |
| case IIO_ATTR_TYPE_DEBUG: |
| ret = iio_device_debug_attr_read(dev, |
| attr, buf, sizeof(buf) - 1); |
| break; |
| case IIO_ATTR_TYPE_BUFFER: |
| ret = iio_device_buffer_attr_read(dev, |
| attr, buf, sizeof(buf) - 1); |
| break; |
| default: |
| ret = -EINVAL; |
| break; |
| } |
| print_value(pdata, ret); |
| if (ret < 0) |
| return ret; |
| |
| buf[ret] = '\n'; |
| return write_all(pdata, buf, ret + 1); |
| } |
| |
| ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev, |
| const char *attr, size_t len, enum iio_attr_type type) |
| { |
| ssize_t ret = -ENOMEM; |
| char *buf; |
| |
| if (!dev) { |
| ret = -ENODEV; |
| goto err_print_value; |
| } |
| |
| buf = malloc(len); |
| if (!buf) |
| goto err_print_value; |
| |
| ret = read_all(pdata, buf, len); |
| if (ret < 0) |
| goto err_free_buffer; |
| |
| switch (type) { |
| case IIO_ATTR_TYPE_DEVICE: |
| ret = iio_device_attr_write_raw(dev, attr, buf, len); |
| break; |
| case IIO_ATTR_TYPE_DEBUG: |
| ret = iio_device_debug_attr_write_raw(dev, attr, buf, len); |
| break; |
| case IIO_ATTR_TYPE_BUFFER: |
| ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len); |
| break; |
| default: |
| ret = -EINVAL; |
| break; |
| } |
| |
| err_free_buffer: |
| free(buf); |
| err_print_value: |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| ssize_t read_chn_attr(struct parser_pdata *pdata, |
| struct iio_channel *chn, const char *attr) |
| { |
| char buf[1024]; |
| ssize_t ret = -ENODEV; |
| |
| if (chn) |
| ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1); |
| else if (pdata->dev) |
| ret = -ENXIO; |
| print_value(pdata, ret); |
| if (ret < 0) |
| return ret; |
| |
| buf[ret] = '\n'; |
| return write_all(pdata, buf, ret + 1); |
| } |
| |
| ssize_t write_chn_attr(struct parser_pdata *pdata, |
| struct iio_channel *chn, const char *attr, size_t len) |
| { |
| ssize_t ret = -ENOMEM; |
| char *buf = malloc(len); |
| if (!buf) |
| goto err_print_value; |
| |
| ret = read_all(pdata, buf, len); |
| if (ret < 0) |
| goto err_free_buffer; |
| |
| if (chn) |
| ret = iio_channel_attr_write_raw(chn, attr, buf, len); |
| else if (pdata->dev) |
| ret = -ENXIO; |
| else |
| ret = -ENODEV; |
| err_free_buffer: |
| free(buf); |
| err_print_value: |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| ssize_t set_trigger(struct parser_pdata *pdata, |
| struct iio_device *dev, const char *trigger) |
| { |
| struct iio_device *trig = NULL; |
| ssize_t ret = -ENOENT; |
| |
| if (!dev) { |
| ret = -ENODEV; |
| goto err_print_value; |
| } |
| |
| if (trigger) { |
| trig = iio_context_find_device(pdata->ctx, trigger); |
| if (!trig) |
| goto err_print_value; |
| } |
| |
| ret = iio_device_set_trigger(dev, trig); |
| err_print_value: |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev) |
| { |
| const struct iio_device *trigger; |
| ssize_t ret; |
| |
| if (!dev) { |
| print_value(pdata, -ENODEV); |
| return -ENODEV; |
| } |
| |
| ret = iio_device_get_trigger(dev, &trigger); |
| if (!ret && trigger) { |
| char buf[256]; |
| |
| ret = strlen(trigger->name); |
| print_value(pdata, ret); |
| |
| snprintf(buf, sizeof(buf), "%s\n", trigger->name); |
| ret = write_all(pdata, buf, ret + 1); |
| } else { |
| print_value(pdata, ret); |
| } |
| return ret; |
| } |
| |
| int set_timeout(struct parser_pdata *pdata, unsigned int timeout) |
| { |
| int ret = iio_context_set_timeout(pdata->ctx, timeout); |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| int set_buffers_count(struct parser_pdata *pdata, |
| struct iio_device *dev, long value) |
| { |
| int ret = -EINVAL; |
| |
| if (!dev) { |
| ret = -ENODEV; |
| goto err_print_value; |
| } |
| |
| if (value >= 1) |
| ret = iio_device_set_kernel_buffers_count( |
| dev, (unsigned int) value); |
| err_print_value: |
| print_value(pdata, ret); |
| return ret; |
| } |
| |
| ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len) |
| { |
| ssize_t ret; |
| |
| if (pdata->fd_in_is_socket) { |
| struct pollfd pfd[2]; |
| bool found; |
| size_t bytes_read = 0; |
| |
| pfd[0].fd = pdata->fd_in; |
| pfd[0].events = POLLIN | POLLRDHUP; |
| pfd[0].revents = 0; |
| pfd[1].fd = thread_pool_get_poll_fd(pdata->pool); |
| pfd[1].events = POLLIN; |
| pfd[1].revents = 0; |
| |
| do { |
| size_t i, to_trunc; |
| |
| poll_nointr(pfd, 2); |
| |
| if (pfd[1].revents & POLLIN || |
| pfd[0].revents & POLLRDHUP) |
| return 0; |
| |
| /* First read from the socket, without advancing the |
| * read offset */ |
| ret = recv(pdata->fd_in, buf, len, |
| MSG_NOSIGNAL | MSG_PEEK); |
| if (ret < 0) |
| return -errno; |
| |
| /* Lookup for the trailing \n */ |
| for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++); |
| found = i < (size_t) ret; |
| |
| len -= ret; |
| buf += ret; |
| |
| to_trunc = found ? i + 1 : (size_t) ret; |
| |
| /* Advance the read offset after the \n if found, or |
| * after the last character read otherwise */ |
| ret = recv(pdata->fd_in, NULL, to_trunc, |
| MSG_NOSIGNAL | MSG_TRUNC); |
| if (ret < 0) |
| return -errno; |
| |
| bytes_read += to_trunc; |
| } while (!found && len); |
| |
| /* No \n found? Just garbage data */ |
| if (!found) |
| ret = -EIO; |
| else |
| ret = bytes_read; |
| } else { |
| ret = pdata->readfd(pdata, buf, len); |
| } |
| |
| return ret; |
| } |
| |
| void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose, |
| bool is_socket, bool use_aio, struct thread_pool *pool) |
| { |
| yyscan_t scanner; |
| struct parser_pdata pdata; |
| unsigned int i; |
| int ret; |
| |
| pdata.ctx = ctx; |
| pdata.stop = false; |
| pdata.fd_in = fd_in; |
| pdata.fd_out = fd_out; |
| pdata.verbose = verbose; |
| pdata.pool = pool; |
| |
| pdata.fd_in_is_socket = is_socket; |
| pdata.fd_out_is_socket = is_socket; |
| |
| SLIST_INIT(&pdata.thdlist_head); |
| |
| if (use_aio) { |
| /* Note: if WITH_AIO is not defined, use_aio is always false. |
| * We ensure that in iiod.c. */ |
| #if WITH_AIO |
| char err_str[1024]; |
| |
| pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); |
| if (pdata.aio_eventfd < 0) { |
| iio_strerror(errno, err_str, sizeof(err_str)); |
| ERROR("Failed to create AIO eventfd: %s\n", err_str); |
| return; |
| } |
| |
| pdata.aio_ctx = 0; |
| ret = io_setup(1, &pdata.aio_ctx); |
| if (ret < 0) { |
| iio_strerror(-ret, err_str, sizeof(err_str)); |
| ERROR("Failed to create AIO context: %s\n", err_str); |
| close(pdata.aio_eventfd); |
| return; |
| } |
| pthread_mutex_init(&pdata.aio_mutex, NULL); |
| pdata.readfd = readfd_aio; |
| pdata.writefd = writefd_aio; |
| #endif |
| } else { |
| pdata.readfd = readfd_io; |
| pdata.writefd = writefd_io; |
| } |
| |
| yylex_init_extra(&pdata, &scanner); |
| |
| do { |
| if (verbose) |
| output(&pdata, "iio-daemon > "); |
| ret = yyparse(scanner); |
| } while (!pdata.stop && ret >= 0); |
| |
| yylex_destroy(scanner); |
| |
| /* Close all opened devices */ |
| for (i = 0; i < ctx->nb_devices; i++) |
| close_dev_helper(&pdata, ctx->devices[i]); |
| |
| #if WITH_AIO |
| if (use_aio) { |
| io_destroy(pdata.aio_ctx); |
| close(pdata.aio_eventfd); |
| } |
| #endif |
| } |