summaryrefslogtreecommitdiff
path: root/iiod/ops.c
diff options
context:
space:
mode:
Diffstat (limited to 'iiod/ops.c')
-rw-r--r--iiod/ops.c1421
1 files changed, 1421 insertions, 0 deletions
diff --git a/iiod/ops.c b/iiod/ops.c
new file mode 100644
index 0000000..bf26bc1
--- /dev/null
+++ b/iiod/ops.c
@@ -0,0 +1,1421 @@
+/*
+ * libiio - Library for interfacing industrial I/O (IIO) devices
+ *
+ * Copyright (C) 2014 Analog Devices, Inc.
+ * Author: Paul Cercueil <paul.cercueil@analog.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * */
+
+#include "ops.h"
+#include "parser.h"
+#include "thread-pool.h"
+#include "../debug.h"
+#include "../iio-private.h"
+
+#include <errno.h>
+#include <limits.h>
+#include <pthread.h>
+#include <poll.h>
+#include <stdbool.h>
+#include <string.h>
+#include <sys/eventfd.h>
+#include <time.h>
+#include <fcntl.h>
+#include <signal.h>
+
+int yyparse(yyscan_t scanner);
+
+struct DevEntry;
+
+/* Corresponds to a thread reading from a device */
+struct ThdEntry {
+ SLIST_ENTRY(ThdEntry) parser_list_entry;
+ SLIST_ENTRY(ThdEntry) dev_list_entry;
+ unsigned int nb, sample_size, samples_count;
+ ssize_t err;
+
+ int eventfd;
+
+ struct parser_pdata *pdata;
+ struct iio_device *dev;
+ struct DevEntry *entry;
+
+ uint32_t *mask;
+ bool active, is_writer, new_client, wait_for_open;
+};
+
+static void thd_entry_event_signal(struct ThdEntry *thd)
+{
+ uint64_t e = 1;
+ int ret;
+
+ do {
+ ret = write(thd->eventfd, &e, sizeof(e));
+ } while (ret == -1 && errno == EINTR);
+}
+
+static int thd_entry_event_wait(struct ThdEntry *thd, pthread_mutex_t *mutex,
+ int fd_in)
+{
+ struct pollfd pfd[3];
+ uint64_t e;
+ int ret;
+
+ pthread_mutex_unlock(mutex);
+
+ pfd[0].fd = thd->eventfd;
+ pfd[0].events = POLLIN;
+ pfd[1].fd = fd_in;
+ pfd[1].events = POLLRDHUP;
+ pfd[2].fd = thread_pool_get_poll_fd(thd->pdata->pool);
+ pfd[2].events = POLLIN;
+
+ do {
+ poll_nointr(pfd, 3);
+
+ if ((pfd[1].revents & POLLRDHUP) || (pfd[2].revents & POLLIN)) {
+ pthread_mutex_lock(mutex);
+ return -EPIPE;
+ }
+
+ do {
+ ret = read(thd->eventfd, &e, sizeof(e));
+ } while (ret == -1 && errno == EINTR);
+ } while (ret == -1 && errno == EAGAIN);
+
+ pthread_mutex_lock(mutex);
+
+ return 0;
+}
+
+/* Corresponds to an opened device */
+struct DevEntry {
+ unsigned int ref_count;
+
+ struct iio_device *dev;
+ struct iio_buffer *buf;
+ unsigned int sample_size, nb_clients;
+ bool update_mask;
+ bool cyclic;
+ bool closed;
+ bool cancelled;
+
+ /* Linked list of ThdEntry structures corresponding
+ * to all the threads who opened the device */
+ SLIST_HEAD(ThdHead, ThdEntry) thdlist_head;
+ pthread_mutex_t thdlist_lock;
+
+ pthread_cond_t rw_ready_cond;
+
+ uint32_t *mask;
+ size_t nb_words;
+};
+
+struct sample_cb_info {
+ struct parser_pdata *pdata;
+ unsigned int nb_bytes, cpt;
+ uint32_t *mask;
+};
+
+/* Protects iio_device_{set,get}_data() from concurrent access from multiple
+ * clients */
+static pthread_mutex_t devlist_lock = PTHREAD_MUTEX_INITIALIZER;
+
+#if WITH_AIO
+static ssize_t async_io(struct parser_pdata *pdata, void *buf, size_t len,
+ bool do_read)
+{
+ ssize_t ret;
+ struct pollfd pfd[2];
+ unsigned int num_pfds;
+ struct iocb iocb;
+ struct iocb *ios[1];
+ struct io_event e[1];
+
+ ios[0] = &iocb;
+
+ if (do_read)
+ io_prep_pread(&iocb, pdata->fd_in, buf, len, 0);
+ else
+ io_prep_pwrite(&iocb, pdata->fd_out, buf, len, 0);
+
+ io_set_eventfd(&iocb, pdata->aio_eventfd);
+
+ pthread_mutex_lock(&pdata->aio_mutex);
+
+ ret = io_submit(pdata->aio_ctx, 1, ios);
+ if (ret != 1) {
+ pthread_mutex_unlock(&pdata->aio_mutex);
+ ERROR("Failed to submit IO operation: %zd\n", ret);
+ return -EIO;
+ }
+
+ pfd[0].fd = pdata->aio_eventfd;
+ pfd[0].events = POLLIN;
+ pfd[0].revents = 0;
+ pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+ num_pfds = 2;
+
+ do {
+ poll_nointr(pfd, num_pfds);
+
+ if (pfd[0].revents & POLLIN) {
+ uint64_t event;
+ ret = read(pdata->aio_eventfd, &event, sizeof(event));
+ if (ret != sizeof(event)) {
+ ERROR("Failed to read from eventfd: %d\n", -errno);
+ ret = -EIO;
+ break;
+ }
+
+ ret = io_getevents(pdata->aio_ctx, 0, 1, e, NULL);
+ if (ret != 1) {
+ ERROR("Failed to read IO events: %zd\n", ret);
+ ret = -EIO;
+ break;
+ } else {
+ ret = (long)e[0].res;
+ }
+ } else if ((num_pfds > 1 && pfd[1].revents & POLLIN)) {
+ /* Got a STOP event to abort this whole session */
+ ret = io_cancel(pdata->aio_ctx, &iocb, e);
+ if (ret != -EINPROGRESS && ret != -EINVAL) {
+ ERROR("Failed to cancel IO transfer: %zd\n", ret);
+ ret = -EIO;
+ break;
+ }
+ /* It should not be long now until we get the cancellation event */
+ num_pfds = 1;
+ }
+ } while (!(pfd[0].revents & POLLIN));
+
+ pthread_mutex_unlock(&pdata->aio_mutex);
+
+ /* Got STOP event, treat it as EOF */
+ if (num_pfds == 1)
+ return 0;
+
+ return ret;
+}
+
+#define MAX_AIO_REQ_SIZE (1024 * 1024)
+
+static ssize_t readfd_aio(struct parser_pdata *pdata, void *dest, size_t len)
+{
+ if (len > MAX_AIO_REQ_SIZE)
+ len = MAX_AIO_REQ_SIZE;
+ return async_io(pdata, dest, len, true);
+}
+
+static ssize_t writefd_aio(struct parser_pdata *pdata, const void *dest,
+ size_t len)
+{
+ if (len > MAX_AIO_REQ_SIZE)
+ len = MAX_AIO_REQ_SIZE;
+ return async_io(pdata, (void *)dest, len, false);
+}
+#endif /* WITH_AIO */
+
+static ssize_t readfd_io(struct parser_pdata *pdata, void *dest, size_t len)
+{
+ ssize_t ret;
+ struct pollfd pfd[2];
+
+ pfd[0].fd = pdata->fd_in;
+ pfd[0].events = POLLIN | POLLRDHUP;
+ pfd[0].revents = 0;
+ pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+
+ do {
+ poll_nointr(pfd, 2);
+
+ /* Got STOP event, or client closed the socket: treat it as EOF */
+ if (pfd[1].revents & POLLIN || pfd[0].revents & POLLRDHUP)
+ return 0;
+ if (pfd[0].revents & POLLERR)
+ return -EIO;
+ if (!(pfd[0].revents & POLLIN))
+ continue;
+
+ do {
+ if (pdata->fd_in_is_socket)
+ ret = recv(pdata->fd_in, dest, len, MSG_NOSIGNAL);
+ else
+ ret = read(pdata->fd_in, dest, len);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret != -1 || errno != EAGAIN)
+ break;
+ } while (true);
+
+ if (ret == -1)
+ return -errno;
+
+ return ret;
+}
+
+static ssize_t writefd_io(struct parser_pdata *pdata, const void *src, size_t len)
+{
+ ssize_t ret;
+ struct pollfd pfd[2];
+
+ pfd[0].fd = pdata->fd_out;
+ pfd[0].events = POLLOUT;
+ pfd[0].revents = 0;
+ pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+
+ do {
+ poll_nointr(pfd, 2);
+
+ /* Got STOP event, or client closed the socket: treat it as EOF */
+ if (pfd[1].revents & POLLIN || pfd[0].revents & POLLHUP)
+ return 0;
+ if (pfd[0].revents & POLLERR)
+ return -EIO;
+ if (!(pfd[0].revents & POLLOUT))
+ continue;
+
+ do {
+ if (pdata->fd_out_is_socket)
+ ret = send(pdata->fd_out, src, len, MSG_NOSIGNAL);
+ else
+ ret = write(pdata->fd_out, src, len);
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret != -1 || errno != EAGAIN)
+ break;
+ } while (true);
+
+ if (ret == -1)
+ return -errno;
+
+ return ret;
+}
+
+ssize_t write_all(struct parser_pdata *pdata, const void *src, size_t len)
+{
+ uintptr_t ptr = (uintptr_t) src;
+
+ while (len) {
+ ssize_t ret = pdata->writefd(pdata, (void *) ptr, len);
+ if (ret < 0)
+ return ret;
+ if (!ret)
+ return -EPIPE;
+ ptr += ret;
+ len -= ret;
+ }
+
+ return ptr - (uintptr_t) src;
+}
+
+static ssize_t read_all(struct parser_pdata *pdata,
+ void *dst, size_t len)
+{
+ uintptr_t ptr = (uintptr_t) dst;
+
+ while (len) {
+ ssize_t ret = pdata->readfd(pdata, (void *) ptr, len);
+ if (ret < 0)
+ return ret;
+ if (!ret)
+ return -EPIPE;
+ ptr += ret;
+ len -= ret;
+ }
+
+ return ptr - (uintptr_t) dst;
+}
+
+static void print_value(struct parser_pdata *pdata, long value)
+{
+ if (pdata->verbose && value < 0) {
+ char buf[1024];
+ iio_strerror(-value, buf, sizeof(buf));
+ output(pdata, "ERROR: ");
+ output(pdata, buf);
+ output(pdata, "\n");
+ } else {
+ char buf[128];
+ sprintf(buf, "%li\n", value);
+ output(pdata, buf);
+ }
+}
+
+static ssize_t send_sample(const struct iio_channel *chn,
+ void *src, size_t length, void *d)
+{
+ struct sample_cb_info *info = d;
+ if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
+ return 0;
+ if (info->nb_bytes < length)
+ return 0;
+
+ if (info->cpt % length) {
+ unsigned int i, goal = length - info->cpt % length;
+ char zero = 0;
+ ssize_t ret;
+
+ for (i = 0; i < goal; i++) {
+ ret = info->pdata->writefd(info->pdata, &zero, 1);
+ if (ret < 0)
+ return ret;
+ }
+ info->cpt += goal;
+ }
+
+ info->cpt += length;
+ info->nb_bytes -= length;
+ return write_all(info->pdata, src, length);
+}
+
+static ssize_t receive_sample(const struct iio_channel *chn,
+ void *dst, size_t length, void *d)
+{
+ struct sample_cb_info *info = d;
+ if (chn->index < 0 || !TEST_BIT(info->mask, chn->number))
+ return 0;
+ if (info->cpt == info->nb_bytes)
+ return 0;
+
+ /* Skip the padding if needed */
+ if (info->cpt % length) {
+ unsigned int i, goal = length - info->cpt % length;
+ char foo;
+ ssize_t ret;
+
+ for (i = 0; i < goal; i++) {
+ ret = info->pdata->readfd(info->pdata, &foo, 1);
+ if (ret < 0)
+ return ret;
+ }
+ info->cpt += goal;
+ }
+
+ info->cpt += length;
+ return read_all(info->pdata, dst, length);
+}
+
+static ssize_t send_data(struct DevEntry *dev, struct ThdEntry *thd, size_t len)
+{
+ struct parser_pdata *pdata = thd->pdata;
+ bool demux = server_demux && dev->sample_size != thd->sample_size;
+
+ if (demux)
+ len = (len / dev->sample_size) * thd->sample_size;
+ if (len > thd->nb)
+ len = thd->nb;
+
+ print_value(pdata, len);
+
+ if (thd->new_client) {
+ unsigned int i;
+ char buf[129], *ptr = buf;
+ uint32_t *mask = demux ? thd->mask : dev->mask;
+ ssize_t ret;
+
+ /* Send the current mask */
+ for (i = dev->nb_words; i > 0 && ptr < buf + sizeof(buf);
+ i--, ptr += 8)
+ sprintf(ptr, "%08x", mask[i - 1]);
+
+ *ptr = '\n';
+ ret = write_all(pdata, buf, ptr + 1 - buf);
+ if (ret < 0)
+ return ret;
+
+ thd->new_client = false;
+ }
+
+ if (!demux) {
+ /* Short path */
+ return write_all(pdata, dev->buf->buffer, len);
+ } else {
+ struct sample_cb_info info = {
+ .pdata = pdata,
+ .cpt = 0,
+ .nb_bytes = len,
+ .mask = thd->mask,
+ };
+
+ return iio_buffer_foreach_sample(dev->buf, send_sample, &info);
+ }
+}
+
+static ssize_t receive_data(struct DevEntry *dev, struct ThdEntry *thd)
+{
+ struct parser_pdata *pdata = thd->pdata;
+
+ /* Inform that no error occured, and that we'll start reading data */
+ if (thd->new_client) {
+ print_value(thd->pdata, 0);
+ thd->new_client = false;
+ }
+
+ if (dev->sample_size == thd->sample_size) {
+ /* Short path: Receive directly in the buffer */
+
+ size_t len = dev->buf->length;
+ if (thd->nb < len)
+ len = thd->nb;
+
+ return read_all(pdata, dev->buf->buffer, len);
+ } else {
+ /* Long path: Mux the samples to the buffer */
+
+ struct sample_cb_info info = {
+ .pdata = pdata,
+ .cpt = 0,
+ .nb_bytes = thd->nb,
+ .mask = thd->mask,
+ };
+
+ return iio_buffer_foreach_sample(dev->buf,
+ receive_sample, &info);
+ }
+}
+
+static void dev_entry_put(struct DevEntry *entry)
+{
+ bool free_entry = false;
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+ entry->ref_count--;
+ if (entry->ref_count == 0)
+ free_entry = true;
+ pthread_mutex_unlock(&entry->thdlist_lock);
+
+ if (free_entry) {
+ pthread_mutex_destroy(&entry->thdlist_lock);
+ pthread_cond_destroy(&entry->rw_ready_cond);
+
+ free(entry->mask);
+ free(entry);
+ }
+}
+
+static void signal_thread(struct ThdEntry *thd, ssize_t ret)
+{
+ thd->err = ret;
+ thd->nb = 0;
+ thd->active = false;
+ thd_entry_event_signal(thd);
+}
+
+static void rw_thd(struct thread_pool *pool, void *d)
+{
+ struct DevEntry *entry = d;
+ struct ThdEntry *thd, *next_thd;
+ struct iio_device *dev = entry->dev;
+ unsigned int nb_words = entry->nb_words;
+ ssize_t ret = 0;
+
+ DEBUG("R/W thread started for device %s\n",
+ dev->name ? dev->name : dev->id);
+
+ while (true) {
+ bool has_readers = false, has_writers = false,
+ mask_updated = false;
+ unsigned int sample_size;
+
+ /* NOTE: this while loop must exit with thdlist_lock locked. */
+ pthread_mutex_lock(&entry->thdlist_lock);
+
+ if (SLIST_EMPTY(&entry->thdlist_head))
+ break;
+
+ if (entry->update_mask) {
+ unsigned int i;
+ unsigned int samples_count = 0;
+
+ memset(entry->mask, 0, nb_words * sizeof(*entry->mask));
+ SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
+ for (i = 0; i < nb_words; i++)
+ entry->mask[i] |= thd->mask[i];
+
+ if (thd->samples_count > samples_count)
+ samples_count = thd->samples_count;
+ }
+
+ if (entry->buf)
+ iio_buffer_destroy(entry->buf);
+
+ for (i = 0; i < dev->nb_channels; i++) {
+ struct iio_channel *chn = dev->channels[i];
+ long index = chn->index;
+
+ if (index < 0)
+ continue;
+
+ if (TEST_BIT(entry->mask, chn->number))
+ iio_channel_enable(chn);
+ else
+ iio_channel_disable(chn);
+ }
+
+ entry->buf = iio_device_create_buffer(dev,
+ samples_count, entry->cyclic);
+ if (!entry->buf) {
+ ret = -errno;
+ ERROR("Unable to create buffer\n");
+ break;
+ }
+ entry->cancelled = false;
+
+ /* Signal the threads that we opened the device */
+ SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
+ if (thd->wait_for_open) {
+ thd->wait_for_open = false;
+ signal_thread(thd, 0);
+ }
+ }
+
+ DEBUG("IIO device %s reopened with new mask:\n",
+ dev->id);
+ for (i = 0; i < nb_words; i++)
+ DEBUG("Mask[%i] = 0x%08x\n", i, entry->mask[i]);
+ entry->update_mask = false;
+
+ entry->sample_size = iio_device_get_sample_size(dev);
+ mask_updated = true;
+ }
+
+ sample_size = entry->sample_size;
+
+ SLIST_FOREACH(thd, &entry->thdlist_head, dev_list_entry) {
+ thd->active = !thd->err && thd->nb >= sample_size;
+ if (mask_updated && thd->active)
+ signal_thread(thd, thd->nb);
+
+ if (thd->is_writer)
+ has_writers |= thd->active;
+ else
+ has_readers |= thd->active;
+ }
+
+ if (!has_readers && !has_writers) {
+ pthread_cond_wait(&entry->rw_ready_cond,
+ &entry->thdlist_lock);
+ }
+
+ pthread_mutex_unlock(&entry->thdlist_lock);
+
+ if (!has_readers && !has_writers)
+ continue;
+
+ if (has_readers) {
+ ssize_t nb_bytes;
+
+ ret = iio_buffer_refill(entry->buf);
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+
+ /*
+ * When the last client disconnects the buffer is
+ * cancelled and iio_buffer_refill() returns an error. A
+ * new client might have connected before we got here
+ * though, in that case the rw thread has to stay active
+ * and a new buffer is created. If the list is still empty the loop
+ * will exit normally.
+ */
+ if (entry->cancelled) {
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ continue;
+ }
+
+ if (ret < 0) {
+ ERROR("Reading from device failed: %i\n",
+ (int) ret);
+ break;
+ }
+
+ nb_bytes = ret;
+
+ /* We don't use SLIST_FOREACH here. As soon as a thread is
+ * signaled, its "thd" structure might be freed;
+ * SLIST_FOREACH would then cause a segmentation fault, as it
+ * reads "thd" to get the address of the next element. */
+ for (thd = SLIST_FIRST(&entry->thdlist_head);
+ thd; thd = next_thd) {
+ next_thd = SLIST_NEXT(thd, dev_list_entry);
+
+ if (!thd->active || thd->is_writer)
+ continue;
+
+ ret = send_data(entry, thd, nb_bytes);
+ if (ret > 0)
+ thd->nb -= ret;
+
+ if (ret < 0 || thd->nb < sample_size)
+ signal_thread(thd, (ret < 0) ?
+ ret : thd->nb);
+ }
+
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ }
+
+ if (has_writers) {
+ ssize_t nb_bytes = 0;
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+
+ /* Reset the size of the buffer to its maximum size */
+ entry->buf->data_length = entry->buf->length;
+
+ /* Same comment as above */
+ for (thd = SLIST_FIRST(&entry->thdlist_head);
+ thd; thd = next_thd) {
+ next_thd = SLIST_NEXT(thd, dev_list_entry);
+
+ if (!thd->active || !thd->is_writer)
+ continue;
+
+ ret = receive_data(entry, thd);
+ if (ret > 0) {
+ thd->nb -= ret;
+ if (ret > nb_bytes)
+ nb_bytes = ret;
+ }
+
+ if (ret < 0)
+ signal_thread(thd, ret);
+ }
+
+ ret = iio_buffer_push_partial(entry->buf,
+ nb_bytes / sample_size);
+ if (entry->cancelled) {
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ continue;
+ }
+ if (ret < 0) {
+ ERROR("Writing to device failed: %i\n",
+ (int) ret);
+ break;
+ }
+
+ /* Signal threads which completed their RW command */
+ for (thd = SLIST_FIRST(&entry->thdlist_head);
+ thd; thd = next_thd) {
+ next_thd = SLIST_NEXT(thd, dev_list_entry);
+ if (thd->active && thd->is_writer &&
+ thd->nb < sample_size)
+ signal_thread(thd, thd->nb);
+ }
+
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ }
+ }
+
+ /* Signal all remaining threads */
+ for (thd = SLIST_FIRST(&entry->thdlist_head); thd; thd = next_thd) {
+ next_thd = SLIST_NEXT(thd, dev_list_entry);
+ SLIST_REMOVE(&entry->thdlist_head, thd, ThdEntry, dev_list_entry);
+ thd->wait_for_open = false;
+ signal_thread(thd, ret);
+ }
+ if (entry->buf) {
+ iio_buffer_destroy(entry->buf);
+ entry->buf = NULL;
+ }
+ entry->closed = true;
+ pthread_mutex_unlock(&entry->thdlist_lock);
+
+ pthread_mutex_lock(&devlist_lock);
+ /* It is possible that a new thread has already started, make sure to
+ * not overwrite it. */
+ if (iio_device_get_data(dev) == entry)
+ iio_device_set_data(dev, NULL);
+ pthread_mutex_unlock(&devlist_lock);
+
+ DEBUG("Stopping R/W thread for device %s\n",
+ dev->name ? dev->name : dev->id);
+
+ dev_entry_put(entry);
+}
+
+static struct ThdEntry *parser_lookup_thd_entry(struct parser_pdata *pdata,
+ struct iio_device *dev)
+{
+ struct ThdEntry *t;
+
+ SLIST_FOREACH(t, &pdata->thdlist_head, parser_list_entry) {
+ if (t->dev == dev)
+ return t;
+ }
+
+ return NULL;
+}
+
+static ssize_t rw_buffer(struct parser_pdata *pdata,
+ struct iio_device *dev, unsigned int nb, bool is_write)
+{
+ struct DevEntry *entry;
+ struct ThdEntry *thd;
+ ssize_t ret;
+
+ if (!dev)
+ return -ENODEV;
+
+ thd = parser_lookup_thd_entry(pdata, dev);
+ if (!thd)
+ return -EBADF;
+
+ entry = thd->entry;
+
+ if (nb < entry->sample_size)
+ return 0;
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+ if (entry->closed) {
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ return -EBADF;
+ }
+
+ if (thd->nb) {
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ return -EBUSY;
+ }
+
+ thd->new_client = true;
+ thd->nb = nb;
+ thd->err = 0;
+ thd->is_writer = is_write;
+ thd->active = true;
+
+ pthread_cond_signal(&entry->rw_ready_cond);
+
+ DEBUG("Waiting for completion...\n");
+ while (thd->active) {
+ ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
+ if (ret)
+ break;
+ }
+ if (ret == 0)
+ ret = thd->err;
+ pthread_mutex_unlock(&entry->thdlist_lock);
+
+ if (ret > 0 && ret < nb)
+ print_value(thd->pdata, 0);
+
+ DEBUG("Exiting rw_buffer with code %li\n", (long) ret);
+ if (ret < 0)
+ return ret;
+ else
+ return nb - ret;
+}
+
+static uint32_t *get_mask(const char *mask, size_t *len)
+{
+ size_t nb = (*len + 7) / 8;
+ uint32_t *ptr, *words = calloc(nb, sizeof(*words));
+ if (!words)
+ return NULL;
+
+ ptr = words + nb;
+ while (*mask) {
+ char buf[9];
+ sprintf(buf, "%.*s", 8, mask);
+ sscanf(buf, "%08x", --ptr);
+ mask += 8;
+ DEBUG("Mask[%lu]: 0x%08x\n",
+ (unsigned long) (words - ptr) / 4, *ptr);
+ }
+
+ *len = nb;
+ return words;
+}
+
+static void free_thd_entry(struct ThdEntry *t)
+{
+ close(t->eventfd);
+ free(t->mask);
+ free(t);
+}
+
+static void remove_thd_entry(struct ThdEntry *t)
+{
+ struct DevEntry *entry = t->entry;
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+ if (!entry->closed) {
+ entry->update_mask = true;
+ SLIST_REMOVE(&entry->thdlist_head, t, ThdEntry, dev_list_entry);
+ if (SLIST_EMPTY(&entry->thdlist_head) && entry->buf) {
+ entry->cancelled = true;
+ iio_buffer_cancel(entry->buf); /* Wakeup the rw thread */
+ }
+
+ pthread_cond_signal(&entry->rw_ready_cond);
+ }
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ dev_entry_put(entry);
+
+ free_thd_entry(t);
+}
+
+static int open_dev_helper(struct parser_pdata *pdata, struct iio_device *dev,
+ size_t samples_count, const char *mask, bool cyclic)
+{
+ int ret = -ENOMEM;
+ struct DevEntry *entry;
+ struct ThdEntry *thd;
+ size_t len = strlen(mask);
+ uint32_t *words;
+ unsigned int nb_channels;
+ unsigned int cyclic_retry = 500;
+
+ if (!dev)
+ return -ENODEV;
+
+ nb_channels = dev->nb_channels;
+ if (len != ((nb_channels + 31) / 32) * 8)
+ return -EINVAL;
+
+ words = get_mask(mask, &len);
+ if (!words)
+ return -ENOMEM;
+
+ thd = zalloc(sizeof(*thd));
+ if (!thd)
+ goto err_free_words;
+
+ thd->wait_for_open = true;
+ thd->mask = words;
+ thd->nb = 0;
+ thd->samples_count = samples_count;
+ thd->sample_size = iio_device_get_sample_size_mask(dev, words, len);
+ thd->pdata = pdata;
+ thd->dev = dev;
+ thd->eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+
+retry:
+ /* Atomically look up the thread and make sure that it is still active
+ * or allocate new one. */
+ pthread_mutex_lock(&devlist_lock);
+ entry = iio_device_get_data(dev);
+ if (entry) {
+ if (cyclic || entry->cyclic) {
+ /* Only one client allowed in cyclic mode */
+ pthread_mutex_unlock(&devlist_lock);
+
+ /* There is an inherent race condition if a client
+ * creates a new cyclic buffer shortly after destroying
+ * a previous. E.g. like
+ *
+ * iio_buffer_destroy(buf);
+ * buf = iio_device_create_buffer(dev, n, true);
+ *
+ * In this case the two buffers each use their own
+ * communication channel which are unordered to each
+ * other. E.g. the socket open might arrive before the
+ * socket close on the host side, even though they were
+ * sent in the opposite order on the client side. This
+ * race condition can cause an error being reported back
+ * to the client, even though the code on the client
+ * side was well formed and would work fine e.g. using
+ * the local backend.
+ *
+ * To avoid this issue go to sleep for up to 50ms in
+ * intervals of 100us. This should be enough time for
+ * the issue to resolve itself. If there actually is
+ * contention on the buffer an error will eventually be
+ * returned in which case the additional delay cause by
+ * the retires should not matter too much.
+ *
+ * This is not pretty but it works.
+ */
+ if (cyclic_retry) {
+ cyclic_retry--;
+ usleep(100);
+ goto retry;
+ }
+
+ ret = -EBUSY;
+ goto err_free_thd;
+ }
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+ if (!entry->closed) {
+ pthread_mutex_unlock(&devlist_lock);
+
+ entry->ref_count++;
+
+ SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
+ thd->entry = entry;
+ entry->update_mask = true;
+ DEBUG("Added thread to client list\n");
+
+ pthread_cond_signal(&entry->rw_ready_cond);
+
+ /* Wait until the device is opened by the rw thread */
+ while (thd->wait_for_open) {
+ ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
+ if (ret)
+ break;
+ }
+ pthread_mutex_unlock(&entry->thdlist_lock);
+
+ if (ret == 0)
+ ret = (int) thd->err;
+ if (ret < 0)
+ remove_thd_entry(thd);
+ else
+ SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
+ return ret;
+ } else {
+ pthread_mutex_unlock(&entry->thdlist_lock);
+ }
+ }
+
+ entry = zalloc(sizeof(*entry));
+ if (!entry) {
+ pthread_mutex_unlock(&devlist_lock);
+ goto err_free_thd;
+ }
+
+ entry->ref_count = 2; /* One for thread, one for the client */
+
+ entry->mask = malloc(len * sizeof(*words));
+ if (!entry->mask) {
+ pthread_mutex_unlock(&devlist_lock);
+ goto err_free_entry;
+ }
+
+ entry->cyclic = cyclic;
+ entry->nb_words = len;
+ entry->update_mask = true;
+ entry->dev = dev;
+ entry->buf = NULL;
+ SLIST_INIT(&entry->thdlist_head);
+ SLIST_INSERT_HEAD(&entry->thdlist_head, thd, dev_list_entry);
+ thd->entry = entry;
+ DEBUG("Added thread to client list\n");
+
+ pthread_mutex_init(&entry->thdlist_lock, NULL);
+ pthread_cond_init(&entry->rw_ready_cond, NULL);
+
+ ret = thread_pool_add_thread(main_thread_pool, rw_thd, entry, "rw_thd");
+ if (ret) {
+ pthread_mutex_unlock(&devlist_lock);
+ goto err_free_entry_mask;
+ }
+
+ DEBUG("Adding new device thread to device list\n");
+ iio_device_set_data(dev, entry);
+ pthread_mutex_unlock(&devlist_lock);
+
+ pthread_mutex_lock(&entry->thdlist_lock);
+ /* Wait until the device is opened by the rw thread */
+ while (thd->wait_for_open) {
+ ret = thd_entry_event_wait(thd, &entry->thdlist_lock, pdata->fd_in);
+ if (ret)
+ break;
+ }
+ pthread_mutex_unlock(&entry->thdlist_lock);
+
+ if (ret == 0)
+ ret = (int) thd->err;
+ if (ret < 0)
+ remove_thd_entry(thd);
+ else
+ SLIST_INSERT_HEAD(&pdata->thdlist_head, thd, parser_list_entry);
+ return ret;
+
+err_free_entry_mask:
+ free(entry->mask);
+err_free_entry:
+ free(entry);
+err_free_thd:
+ close(thd->eventfd);
+ free(thd);
+err_free_words:
+ free(words);
+ return ret;
+}
+
+static int close_dev_helper(struct parser_pdata *pdata, struct iio_device *dev)
+{
+ struct ThdEntry *t;
+
+ if (!dev)
+ return -ENODEV;
+
+ t = parser_lookup_thd_entry(pdata, dev);
+ if (!t)
+ return -ENXIO;
+
+ SLIST_REMOVE(&pdata->thdlist_head, t, ThdEntry, parser_list_entry);
+ remove_thd_entry(t);
+
+ return 0;
+}
+
+int open_dev(struct parser_pdata *pdata, struct iio_device *dev,
+ size_t samples_count, const char *mask, bool cyclic)
+{
+ int ret = open_dev_helper(pdata, dev, samples_count, mask, cyclic);
+ print_value(pdata, ret);
+ return ret;
+}
+
+int close_dev(struct parser_pdata *pdata, struct iio_device *dev)
+{
+ int ret = close_dev_helper(pdata, dev);
+ print_value(pdata, ret);
+ return ret;
+}
+
+ssize_t rw_dev(struct parser_pdata *pdata, struct iio_device *dev,
+ unsigned int nb, bool is_write)
+{
+ ssize_t ret = rw_buffer(pdata, dev, nb, is_write);
+ if (ret <= 0 || is_write)
+ print_value(pdata, ret);
+ return ret;
+}
+
+ssize_t read_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
+ const char *attr, enum iio_attr_type type)
+{
+ /* We use a very large buffer here, as if attr is NULL all the
+ * attributes will be read, which may represents a few kilobytes worth
+ * of data. */
+ char buf[0x10000];
+ ssize_t ret = -EINVAL;
+
+ if (!dev) {
+ print_value(pdata, -ENODEV);
+ return -ENODEV;
+ }
+
+ switch (type) {
+ case IIO_ATTR_TYPE_DEVICE:
+ ret = iio_device_attr_read(dev, attr, buf, sizeof(buf) - 1);
+ break;
+ case IIO_ATTR_TYPE_DEBUG:
+ ret = iio_device_debug_attr_read(dev,
+ attr, buf, sizeof(buf) - 1);
+ break;
+ case IIO_ATTR_TYPE_BUFFER:
+ ret = iio_device_buffer_attr_read(dev,
+ attr, buf, sizeof(buf) - 1);
+ break;
+ default:
+ ret = -EINVAL;
+ break;
+ }
+ print_value(pdata, ret);
+ if (ret < 0)
+ return ret;
+
+ buf[ret] = '\n';
+ return write_all(pdata, buf, ret + 1);
+}
+
+ssize_t write_dev_attr(struct parser_pdata *pdata, struct iio_device *dev,
+ const char *attr, size_t len, enum iio_attr_type type)
+{
+ ssize_t ret = -ENOMEM;
+ char *buf;
+
+ if (!dev) {
+ ret = -ENODEV;
+ goto err_print_value;
+ }
+
+ buf = malloc(len);
+ if (!buf)
+ goto err_print_value;
+
+ ret = read_all(pdata, buf, len);
+ if (ret < 0)
+ goto err_free_buffer;
+
+ switch (type) {
+ case IIO_ATTR_TYPE_DEVICE:
+ ret = iio_device_attr_write_raw(dev, attr, buf, len);
+ break;
+ case IIO_ATTR_TYPE_DEBUG:
+ ret = iio_device_debug_attr_write_raw(dev, attr, buf, len);
+ break;
+ case IIO_ATTR_TYPE_BUFFER:
+ ret = iio_device_buffer_attr_write_raw(dev, attr, buf, len);
+ break;
+ default:
+ ret = -EINVAL;
+ break;
+ }
+
+err_free_buffer:
+ free(buf);
+err_print_value:
+ print_value(pdata, ret);
+ return ret;
+}
+
+ssize_t read_chn_attr(struct parser_pdata *pdata,
+ struct iio_channel *chn, const char *attr)
+{
+ char buf[1024];
+ ssize_t ret = -ENODEV;
+
+ if (chn)
+ ret = iio_channel_attr_read(chn, attr, buf, sizeof(buf) - 1);
+ else if (pdata->dev)
+ ret = -ENXIO;
+ print_value(pdata, ret);
+ if (ret < 0)
+ return ret;
+
+ buf[ret] = '\n';
+ return write_all(pdata, buf, ret + 1);
+}
+
+ssize_t write_chn_attr(struct parser_pdata *pdata,
+ struct iio_channel *chn, const char *attr, size_t len)
+{
+ ssize_t ret = -ENOMEM;
+ char *buf = malloc(len);
+ if (!buf)
+ goto err_print_value;
+
+ ret = read_all(pdata, buf, len);
+ if (ret < 0)
+ goto err_free_buffer;
+
+ if (chn)
+ ret = iio_channel_attr_write_raw(chn, attr, buf, len);
+ else if (pdata->dev)
+ ret = -ENXIO;
+ else
+ ret = -ENODEV;
+err_free_buffer:
+ free(buf);
+err_print_value:
+ print_value(pdata, ret);
+ return ret;
+}
+
+ssize_t set_trigger(struct parser_pdata *pdata,
+ struct iio_device *dev, const char *trigger)
+{
+ struct iio_device *trig = NULL;
+ ssize_t ret = -ENOENT;
+
+ if (!dev) {
+ ret = -ENODEV;
+ goto err_print_value;
+ }
+
+ if (trigger) {
+ trig = iio_context_find_device(pdata->ctx, trigger);
+ if (!trig)
+ goto err_print_value;
+ }
+
+ ret = iio_device_set_trigger(dev, trig);
+err_print_value:
+ print_value(pdata, ret);
+ return ret;
+}
+
+ssize_t get_trigger(struct parser_pdata *pdata, struct iio_device *dev)
+{
+ const struct iio_device *trigger;
+ ssize_t ret;
+
+ if (!dev) {
+ print_value(pdata, -ENODEV);
+ return -ENODEV;
+ }
+
+ ret = iio_device_get_trigger(dev, &trigger);
+ if (!ret && trigger) {
+ char buf[256];
+
+ ret = strlen(trigger->name);
+ print_value(pdata, ret);
+
+ snprintf(buf, sizeof(buf), "%s\n", trigger->name);
+ ret = write_all(pdata, buf, ret + 1);
+ } else {
+ print_value(pdata, ret);
+ }
+ return ret;
+}
+
+int set_timeout(struct parser_pdata *pdata, unsigned int timeout)
+{
+ int ret = iio_context_set_timeout(pdata->ctx, timeout);
+ print_value(pdata, ret);
+ return ret;
+}
+
+int set_buffers_count(struct parser_pdata *pdata,
+ struct iio_device *dev, long value)
+{
+ int ret = -EINVAL;
+
+ if (!dev) {
+ ret = -ENODEV;
+ goto err_print_value;
+ }
+
+ if (value >= 1)
+ ret = iio_device_set_kernel_buffers_count(
+ dev, (unsigned int) value);
+err_print_value:
+ print_value(pdata, ret);
+ return ret;
+}
+
+ssize_t read_line(struct parser_pdata *pdata, char *buf, size_t len)
+{
+ ssize_t ret;
+
+ if (pdata->fd_in_is_socket) {
+ struct pollfd pfd[2];
+ bool found;
+ size_t bytes_read = 0;
+
+ pfd[0].fd = pdata->fd_in;
+ pfd[0].events = POLLIN | POLLRDHUP;
+ pfd[0].revents = 0;
+ pfd[1].fd = thread_pool_get_poll_fd(pdata->pool);
+ pfd[1].events = POLLIN;
+ pfd[1].revents = 0;
+
+ do {
+ size_t i, to_trunc;
+
+ poll_nointr(pfd, 2);
+
+ if (pfd[1].revents & POLLIN ||
+ pfd[0].revents & POLLRDHUP)
+ return 0;
+
+ /* First read from the socket, without advancing the
+ * read offset */
+ ret = recv(pdata->fd_in, buf, len,
+ MSG_NOSIGNAL | MSG_PEEK);
+ if (ret < 0)
+ return -errno;
+
+ /* Lookup for the trailing \n */
+ for (i = 0; i < (size_t) ret && buf[i] != '\n'; i++);
+ found = i < (size_t) ret;
+
+ len -= ret;
+ buf += ret;
+
+ to_trunc = found ? i + 1 : (size_t) ret;
+
+ /* Advance the read offset after the \n if found, or
+ * after the last character read otherwise */
+ ret = recv(pdata->fd_in, NULL, to_trunc,
+ MSG_NOSIGNAL | MSG_TRUNC);
+ if (ret < 0)
+ return -errno;
+
+ bytes_read += to_trunc;
+ } while (!found && len);
+
+ /* No \n found? Just garbage data */
+ if (!found)
+ ret = -EIO;
+ else
+ ret = bytes_read;
+ } else {
+ ret = pdata->readfd(pdata, buf, len);
+ }
+
+ return ret;
+}
+
+void interpreter(struct iio_context *ctx, int fd_in, int fd_out, bool verbose,
+ bool is_socket, bool use_aio, struct thread_pool *pool)
+{
+ yyscan_t scanner;
+ struct parser_pdata pdata;
+ unsigned int i;
+ int ret;
+
+ pdata.ctx = ctx;
+ pdata.stop = false;
+ pdata.fd_in = fd_in;
+ pdata.fd_out = fd_out;
+ pdata.verbose = verbose;
+ pdata.pool = pool;
+
+ pdata.fd_in_is_socket = is_socket;
+ pdata.fd_out_is_socket = is_socket;
+
+ SLIST_INIT(&pdata.thdlist_head);
+
+ if (use_aio) {
+ /* Note: if WITH_AIO is not defined, use_aio is always false.
+ * We ensure that in iiod.c. */
+#if WITH_AIO
+ char err_str[1024];
+
+ pdata.aio_eventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
+ if (pdata.aio_eventfd < 0) {
+ iio_strerror(errno, err_str, sizeof(err_str));
+ ERROR("Failed to create AIO eventfd: %s\n", err_str);
+ return;
+ }
+
+ pdata.aio_ctx = 0;
+ ret = io_setup(1, &pdata.aio_ctx);
+ if (ret < 0) {
+ iio_strerror(-ret, err_str, sizeof(err_str));
+ ERROR("Failed to create AIO context: %s\n", err_str);
+ close(pdata.aio_eventfd);
+ return;
+ }
+ pthread_mutex_init(&pdata.aio_mutex, NULL);
+ pdata.readfd = readfd_aio;
+ pdata.writefd = writefd_aio;
+#endif
+ } else {
+ pdata.readfd = readfd_io;
+ pdata.writefd = writefd_io;
+ }
+
+ yylex_init_extra(&pdata, &scanner);
+
+ do {
+ if (verbose)
+ output(&pdata, "iio-daemon > ");
+ ret = yyparse(scanner);
+ } while (!pdata.stop && ret >= 0);
+
+ yylex_destroy(scanner);
+
+ /* Close all opened devices */
+ for (i = 0; i < ctx->nb_devices; i++)
+ close_dev_helper(&pdata, ctx->devices[i]);
+
+#if WITH_AIO
+ if (use_aio) {
+ io_destroy(pdata.aio_ctx);
+ close(pdata.aio_eventfd);
+ }
+#endif
+}