| /* |
| * Copyright (c) 2015 Carlos Pizano-Uribe [email protected] |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining |
| * a copy of this software and associated documentation files |
| * (the "Software"), to deal in the Software without restriction, |
| * including without limitation the rights to use, copy, modify, merge, |
| * publish, distribute, sublicense, and/or sell copies of the Software, |
| * and to permit persons to whom the Software is furnished to do so, |
| * subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be |
| * included in all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
| * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
| * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, |
| * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
| * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| */ |
| |
| /** |
| * @file |
| * @brief Port object functions |
| * @defgroup event Events |
| * |
| */ |
| |
| #include <debug.h> |
| #include <list.h> |
| #include <malloc.h> |
| #include <string.h> |
| #include <pow2.h> |
| #include <err.h> |
| #include <kernel/thread.h> |
| #include <kernel/port.h> |
| |
| // write ports can be in two states, open and closed, which have a |
| // different magic number. |
| |
| #define WRITEPORT_MAGIC_W (0x70727477) // 'prtw' |
| #define WRITEPORT_MAGIC_X (0x70727478) // 'prtx' |
| |
| #define READPORT_MAGIC (0x70727472) // 'prtr' |
| #define PORTGROUP_MAGIC (0x70727467) // 'prtg' |
| |
| #define PORT_BUFF_SIZE 8 |
| #define PORT_BUFF_SIZE_BIG 64 |
| |
| #define RESCHEDULE_POLICY 1 |
| |
| #define MAX_PORT_GROUP_COUNT 256 |
| |
| typedef struct { |
| uint log2; |
| uint avail; |
| uint head; |
| uint tail; |
| port_packet_t packet[1]; |
| } port_buf_t; |
| |
| typedef struct { |
| int magic; |
| struct list_node node; |
| port_buf_t *buf; |
| struct list_node rp_list; |
| port_mode_t mode; |
| char name[PORT_NAME_LEN]; |
| } write_port_t; |
| |
| typedef struct { |
| int magic; |
| wait_queue_t wait; |
| struct list_node rp_list; |
| } port_group_t; |
| |
| typedef struct { |
| int magic; |
| struct list_node w_node; |
| struct list_node g_node; |
| port_buf_t *buf; |
| void *ctx; |
| wait_queue_t wait; |
| write_port_t *wport; |
| port_group_t *gport; |
| } read_port_t; |
| |
| |
| static struct list_node write_port_list; |
| |
| |
| static port_buf_t *make_buf(uint pk_count) |
| { |
| uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t)); |
| port_buf_t *buf = (port_buf_t *) malloc(size); |
| if (!buf) |
| return NULL; |
| buf->log2 = log2_uint(pk_count); |
| buf->head = buf->tail = 0; |
| buf->avail = pk_count; |
| return buf; |
| } |
| |
| static inline bool buf_is_empty(port_buf_t *buf) |
| { |
| return buf->avail == valpow2(buf->log2); |
| } |
| |
| static status_t buf_write(port_buf_t *buf, const port_packet_t *packets, size_t count) |
| { |
| if (buf->avail < count) |
| return ERR_NOT_ENOUGH_BUFFER; |
| |
| for (size_t ix = 0; ix != count; ix++) { |
| buf->packet[buf->tail] = packets[ix]; |
| buf->tail = modpow2(++buf->tail, buf->log2); |
| } |
| buf->avail -= count; |
| return NO_ERROR; |
| } |
| |
| static status_t buf_read(port_buf_t *buf, port_result_t *pr) |
| { |
| if (buf_is_empty(buf)) |
| return ERR_NO_MSG; |
| pr->packet = buf->packet[buf->head]; |
| buf->head = modpow2(++buf->head, buf->log2); |
| ++buf->avail; |
| return NO_ERROR; |
| } |
| |
| // must be called before any use of ports. |
| void port_init(void) |
| { |
| list_initialize(&write_port_list); |
| } |
| |
| status_t port_create(const char *name, port_mode_t mode, port_t *port) |
| { |
| if (!name || !port) |
| return ERR_INVALID_ARGS; |
| |
| // only unicast ports can have a large buffer. |
| if (mode & PORT_MODE_BROADCAST) { |
| if (mode & PORT_MODE_BIG_BUFFER) |
| return ERR_INVALID_ARGS; |
| } |
| |
| if (strlen(name) >= PORT_NAME_LEN) |
| return ERR_INVALID_ARGS; |
| |
| // lookup for existing port, return that if found. |
| write_port_t *wp = NULL; |
| THREAD_LOCK(state1); |
| list_for_every_entry(&write_port_list, wp, write_port_t, node) { |
| if (strcmp(wp->name, name) == 0) { |
| // can't return closed ports. |
| if (wp->magic == WRITEPORT_MAGIC_X) |
| wp = NULL; |
| THREAD_UNLOCK(state1); |
| if (wp) { |
| *port = (void *) wp; |
| return ERR_ALREADY_EXISTS; |
| } else { |
| return ERR_BUSY; |
| } |
| } |
| } |
| THREAD_UNLOCK(state1); |
| |
| // not found, create the write port and the circular buffer. |
| wp = calloc(1, sizeof(write_port_t)); |
| if (!wp) |
| return ERR_NO_MEMORY; |
| |
| wp->magic = WRITEPORT_MAGIC_W; |
| wp->mode = mode; |
| strlcpy(wp->name, name, sizeof(wp->name)); |
| list_initialize(&wp->rp_list); |
| |
| uint size = (mode & PORT_MODE_BIG_BUFFER) ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE; |
| wp->buf = make_buf(size); |
| if (!wp->buf) { |
| free(wp); |
| return ERR_NO_MEMORY; |
| } |
| |
| // todo: race condtion! a port with the same name could have been created |
| // by another thread at is point. |
| THREAD_LOCK(state2); |
| list_add_tail(&write_port_list, &wp->node); |
| THREAD_UNLOCK(state2); |
| |
| *port = (void *)wp; |
| return NO_ERROR; |
| } |
| |
| status_t port_open(const char *name, void *ctx, port_t *port) |
| { |
| if (!name || !port) |
| return ERR_INVALID_ARGS; |
| |
| // assume success; create the read port and buffer now. |
| read_port_t *rp = calloc(1, sizeof(read_port_t)); |
| if (!rp) |
| return ERR_NO_MEMORY; |
| |
| rp->magic = READPORT_MAGIC; |
| wait_queue_init(&rp->wait); |
| rp->ctx = ctx; |
| |
| // |buf| might not be needed, but we always allocate outside the lock. |
| // this buffer is only needed for broadcast ports, but we don't know |
| // that here. |
| port_buf_t *buf = make_buf(PORT_BUFF_SIZE); |
| if (!buf) { |
| free(rp); |
| return ERR_NO_MEMORY; |
| } |
| |
| // find the named write port and associate it with read port. |
| status_t rc = ERR_NOT_FOUND; |
| |
| THREAD_LOCK(state); |
| write_port_t *wp = NULL; |
| list_for_every_entry(&write_port_list, wp, write_port_t, node) { |
| if (strcmp(wp->name, name) == 0) { |
| // found; add read port to write port list. |
| rp->wport = wp; |
| if (wp->buf) { |
| // this is the first read port; transfer the circular buffer. |
| list_add_tail(&wp->rp_list, &rp->w_node); |
| rp->buf = wp->buf; |
| wp->buf = NULL; |
| rc = NO_ERROR; |
| } else if (buf) { |
| // not first read port. |
| if (wp->mode & PORT_MODE_UNICAST) { |
| // cannot add a second listener. |
| rc = ERR_NOT_ALLOWED; |
| break; |
| } |
| // use the new (small) circular buffer. |
| list_add_tail(&wp->rp_list, &rp->w_node); |
| rp->buf = buf; |
| buf = NULL; |
| rc = NO_ERROR; |
| } else { |
| // |buf| allocation failed and the buffer was needed. |
| rc = ERR_NO_MEMORY; |
| } |
| break; |
| } |
| } |
| THREAD_UNLOCK(state); |
| |
| if (buf) |
| free(buf); |
| |
| if (rc == NO_ERROR) { |
| *port = (void *)rp; |
| } else { |
| free(rp); |
| } |
| return rc; |
| } |
| |
| status_t port_group(port_t *ports, size_t count, port_t *group) |
| { |
| if (count > MAX_PORT_GROUP_COUNT) |
| return ERR_TOO_BIG; |
| |
| // Allow empty port groups. |
| if (count && !ports) |
| return ERR_INVALID_ARGS; |
| |
| if (!group) |
| return ERR_INVALID_ARGS; |
| |
| // assume success; create port group now. |
| port_group_t *pg = calloc(1, sizeof(port_group_t)); |
| if (!pg) |
| return ERR_NO_MEMORY; |
| |
| pg->magic = PORTGROUP_MAGIC; |
| wait_queue_init(&pg->wait); |
| list_initialize(&pg->rp_list); |
| |
| status_t rc = NO_ERROR; |
| |
| THREAD_LOCK(state); |
| for (size_t ix = 0; ix != count; ix++) { |
| read_port_t *rp = (read_port_t *)ports[ix]; |
| if ((rp->magic != READPORT_MAGIC) || rp->gport) { |
| // wrong type of port, or port already part of a group, |
| // in any case, undo the changes to the previous read ports. |
| for (size_t jx = 0; jx != ix; jx++) { |
| ((read_port_t *)ports[jx])->gport = NULL; |
| } |
| rc = ERR_BAD_HANDLE; |
| break; |
| } |
| // link port group and read port. |
| rp->gport = pg; |
| list_add_tail(&pg->rp_list, &rp->g_node); |
| } |
| THREAD_UNLOCK(state); |
| |
| if (rc == NO_ERROR) { |
| *group = (port_t *)pg; |
| } else { |
| free(pg); |
| } |
| return rc; |
| } |
| |
| status_t port_group_add(port_t group, port_t port) |
| { |
| if (!port || !group) |
| return ERR_INVALID_ARGS; |
| |
| // Make sure the user has actually passed in a port group and a read-port. |
| port_group_t *pg = (port_group_t *)group; |
| if (pg->magic != PORTGROUP_MAGIC) |
| return ERR_INVALID_ARGS; |
| |
| read_port_t *rp = (read_port_t *)port; |
| if (rp->magic != READPORT_MAGIC || rp->gport) |
| return ERR_BAD_HANDLE; |
| |
| status_t rc = NO_ERROR; |
| THREAD_LOCK(state); |
| |
| if (list_length(&pg->rp_list) == MAX_PORT_GROUP_COUNT) { |
| rc = ERR_TOO_BIG; |
| } else { |
| rp->gport = pg; |
| list_add_tail(&pg->rp_list, &rp->g_node); |
| |
| // If the new read port being added has messages available, try to wake |
| // any readers that might be present. |
| if (!buf_is_empty(rp->buf)) { |
| wait_queue_wake_one(&pg->wait, false, NO_ERROR); |
| } |
| } |
| |
| THREAD_UNLOCK(state); |
| |
| return rc; |
| } |
| |
| status_t port_group_remove(port_t group, port_t port) |
| { |
| if (!port || !group) |
| return ERR_INVALID_ARGS; |
| |
| // Make sure the user has actually passed in a port group and a read-port. |
| port_group_t *pg = (port_group_t *)group; |
| if (pg->magic != PORTGROUP_MAGIC) |
| return ERR_INVALID_ARGS; |
| |
| read_port_t *rp = (read_port_t *)port; |
| if (rp->magic != READPORT_MAGIC || rp->gport != pg) |
| return ERR_BAD_HANDLE; |
| |
| THREAD_LOCK(state); |
| |
| bool found = false; |
| read_port_t *current_rp; |
| list_for_every_entry(&pg->rp_list, current_rp, read_port_t, g_node) { |
| if (current_rp == rp) { |
| found = true; |
| } |
| } |
| |
| if (!found) |
| return ERR_BAD_HANDLE; |
| |
| list_delete(&rp->g_node); |
| |
| THREAD_UNLOCK(state); |
| |
| return NO_ERROR; |
| } |
| |
| status_t port_write(port_t port, const port_packet_t *pk, size_t count) |
| { |
| if (!port || !pk) |
| return ERR_INVALID_ARGS; |
| |
| write_port_t *wp = (write_port_t *)port; |
| THREAD_LOCK(state); |
| if (wp->magic != WRITEPORT_MAGIC_W) { |
| // wrong port type. |
| THREAD_UNLOCK(state); |
| return ERR_BAD_HANDLE; |
| } |
| |
| status_t status = NO_ERROR; |
| int awake_count = 0; |
| |
| if (wp->buf) { |
| // there are no read ports, just write to the buffer. |
| status = buf_write(wp->buf, pk, count); |
| } else { |
| // there are read ports. for each, write and attempt to wake a thread |
| // from the port group or from the read port itself. |
| read_port_t *rp; |
| list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { |
| if (buf_write(rp->buf, pk, count) < 0) { |
| // buffer full. |
| status = ERR_PARTIAL_WRITE; |
| continue; |
| } |
| |
| int awaken = 0; |
| if (rp->gport) { |
| awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR); |
| } |
| if (!awaken) { |
| awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR); |
| } |
| |
| awake_count += awaken; |
| } |
| } |
| |
| THREAD_UNLOCK(state); |
| |
| #if RESCHEDULE_POLICY |
| if (awake_count) |
| thread_yield(); |
| #endif |
| |
| return status; |
| } |
| |
| static inline status_t read_no_lock(read_port_t *rp, lk_time_t timeout, port_result_t *result) |
| { |
| status_t status = buf_read(rp->buf, result); |
| result->ctx = rp->ctx; |
| |
| if (status != ERR_NO_MSG) |
| return status; |
| |
| // early return allows compiler to elide the rest for the group read case. |
| if (!timeout) |
| return ERR_TIMED_OUT; |
| |
| status_t wr = wait_queue_block(&rp->wait, timeout); |
| if (wr != NO_ERROR) |
| return wr; |
| // recursive tail call is usually optimized away with a goto. |
| return read_no_lock(rp, timeout, result); |
| } |
| |
| status_t port_read(port_t port, lk_time_t timeout, port_result_t *result) |
| { |
| if (!port || !result) |
| return ERR_INVALID_ARGS; |
| |
| status_t rc = ERR_GENERIC; |
| read_port_t *rp = (read_port_t *)port; |
| |
| THREAD_LOCK(state); |
| if (rp->magic == READPORT_MAGIC) { |
| // dealing with a single port. |
| rc = read_no_lock(rp, timeout, result); |
| } else if (rp->magic == PORTGROUP_MAGIC) { |
| // dealing with a port group. |
| port_group_t *pg = (port_group_t *)port; |
| do { |
| // read each port with no timeout. |
| // todo: this order is fixed, probably a bad thing. |
| list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { |
| rc = read_no_lock(rp, 0, result); |
| if (rc != ERR_TIMED_OUT) |
| goto read_exit; |
| } |
| // no data, block on the group waitqueue. |
| rc = wait_queue_block(&pg->wait, timeout); |
| } while (rc == NO_ERROR); |
| } else { |
| // wrong port type. |
| rc = ERR_BAD_HANDLE; |
| } |
| |
| read_exit: |
| THREAD_UNLOCK(state); |
| return rc; |
| } |
| |
| status_t port_destroy(port_t port) |
| { |
| if (!port) |
| return ERR_INVALID_ARGS; |
| |
| write_port_t *wp = (write_port_t *) port; |
| port_buf_t *buf = NULL; |
| |
| THREAD_LOCK(state); |
| if (wp->magic != WRITEPORT_MAGIC_X) { |
| // wrong port type. |
| THREAD_UNLOCK(state); |
| return ERR_BAD_HANDLE; |
| } |
| // remove self from global named ports list. |
| list_delete(&wp->node); |
| |
| if (wp->buf) { |
| // we have no readers. |
| buf = wp->buf; |
| } else { |
| // for each reader: |
| read_port_t *rp; |
| list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) { |
| // wake the read and group ports. |
| wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED); |
| if (rp->gport) { |
| wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED); |
| } |
| // remove self from reader ports. |
| rp->wport = NULL; |
| } |
| } |
| |
| wp->magic = 0; |
| THREAD_UNLOCK(state); |
| |
| free(buf); |
| free(wp); |
| return NO_ERROR; |
| } |
| |
| status_t port_close(port_t port) |
| { |
| if (!port) |
| return ERR_INVALID_ARGS; |
| |
| read_port_t *rp = (read_port_t *) port; |
| port_buf_t *buf = NULL; |
| |
| THREAD_LOCK(state); |
| if (rp->magic == READPORT_MAGIC) { |
| // dealing with a read port. |
| if (rp->wport) { |
| // remove self from write port list and reassign the bufer if last. |
| list_delete(&rp->w_node); |
| if (list_is_empty(&rp->wport->rp_list)) { |
| rp->wport->buf = rp->buf; |
| rp->buf = NULL; |
| } else { |
| buf = rp->buf; |
| } |
| } |
| if (rp->gport) { |
| // remove self from port group list. |
| list_delete(&rp->g_node); |
| } |
| // wake up waiters, the return code is ERR_OBJECT_DESTROYED. |
| wait_queue_destroy(&rp->wait, true); |
| rp->magic = 0; |
| |
| } else if (rp->magic == PORTGROUP_MAGIC) { |
| // dealing with a port group. |
| port_group_t *pg = (port_group_t *) port; |
| // wake up waiters. |
| wait_queue_destroy(&pg->wait, true); |
| // remove self from reader ports. |
| rp = NULL; |
| list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) { |
| rp->gport = NULL; |
| } |
| pg->magic = 0; |
| |
| } else if (rp->magic == WRITEPORT_MAGIC_W) { |
| // dealing with a write port. |
| write_port_t *wp = (write_port_t *) port; |
| // mark it as closed. Now it can be read but not written to. |
| wp->magic = WRITEPORT_MAGIC_X; |
| THREAD_UNLOCK(state); |
| return NO_ERROR; |
| |
| } else { |
| THREAD_UNLOCK(state); |
| return ERR_BAD_HANDLE; |
| } |
| |
| THREAD_UNLOCK(state); |
| |
| free(buf); |
| free(port); |
| return NO_ERROR; |
| } |
| |