From d0bcd6b20c978fe9e98ec2d52e69147832a37af3 Mon Sep 17 00:00:00 2001 From: Vftdan Date: Tue, 13 Aug 2024 23:33:52 +0200 Subject: [PATCH] Initial commit --- .gitignore | 2 + Makefile | 23 +++ defs.h | 16 +++ events.c | 73 ++++++++++ events.h | 50 +++++++ graph.c | 167 ++++++++++++++++++++++ graph.h | 44 ++++++ main.c | 35 +++++ modifiers.h | 78 +++++++++++ nodes/getchar.c | 94 +++++++++++++ nodes/getchar.h | 8 ++ nodes/print.c | 59 ++++++++ nodes/print.h | 8 ++ processing.c | 361 ++++++++++++++++++++++++++++++++++++++++++++++++ processing.h | 49 +++++++ 15 files changed, 1067 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 defs.h create mode 100644 events.c create mode 100644 events.h create mode 100644 graph.c create mode 100644 graph.h create mode 100644 main.c create mode 100644 modifiers.h create mode 100644 nodes/getchar.c create mode 100644 nodes/getchar.h create mode 100644 nodes/print.c create mode 100644 nodes/print.h create mode 100644 processing.c create mode 100644 processing.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f0c9b81 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.o +main diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..07c0e22 --- /dev/null +++ b/Makefile @@ -0,0 +1,23 @@ +DEPS = +CFLAGS = -Wall -Wextra +ifdef DEBUG + CFLAGS += -Og -gdwarf-2 + CPPFLAGS += -D DEBUG +else + CFLAGS += -O2 +endif +CPPFLAGS += $(shell pkg-config --cflags $(DEPS)) +LDLIBS += $(shell pkg-config --libs $(DEPS)) +INTERP ?= +MAIN = main +OBJS = main.o events.o processing.o graph.o nodes/getchar.o nodes/print.o + +all: $(MAIN) + +run: $(MAIN) + $(INTERP) ./$(MAIN) + +.PHONY: all run + +$(MAIN): $(OBJS) + $(CC) $(LDFLAGS) $^ $(LOADLIBES) $(LDLIBS) -o $@ diff --git a/defs.h b/defs.h new file mode 100644 index 0000000..b47a6c7 --- /dev/null +++ b/defs.h @@ -0,0 +1,16 @@ +#ifndef DEFS_H_ +#define DEFS_H_ + +#include +#include +#include +#include + +#define containerof(ptr, contype, membpath) ((contype*)(0 ? (void)(((contype*)NULL)->membpath = *(ptr)) : (void)0, ((char *)(ptr)) - offsetof(contype, membpath))) +// Assuming child type has a field for the base type +// So for structs it is usually actual downcast, but for unions it is an upcast +#define DOWNCAST(contype, basename, ptr) containerof(ptr, contype, as_##basename) + +#define DEBUG_PRINT_VALUE(x, fmt) fprintf(stderr, #x " = " fmt "\n", x); fflush(stderr) + +#endif /* end of include guard: DEFS_H_ */ diff --git a/events.c b/events.c new file mode 100644 index 0000000..5190349 --- /dev/null +++ b/events.c @@ -0,0 +1,73 @@ +#include "events.h" + +EventNode +END_EVENTS = { + .prev = &END_EVENTS, + .next = &END_EVENTS, + .position = NULL, + .input_index = 0, +}; + +size_t +event_replicate(EventNode * source, size_t count) +{ + size_t i; + for (i = 0; i < count; ++i) { + EventNode * replica = malloc(sizeof(EventNode)); + if (!replica) { + break; + } + replica->position = NULL; + replica->input_index = 0; + replica->data = source->data; + replica->data.modifiers = modifier_set_copy(source->data.modifiers); + replica->prev = source; + replica->next = source->next; + source->next->next->prev = replica; + source->next = replica; + } + return i; +} + +EventNode * +event_create(const EventData * content) +{ + EventNode * event = calloc(1, sizeof(EventNode)); + if (content) { + event->data = *content; + event->data.modifiers = modifier_set_copy(content->modifiers); + } else { + clock_gettime(CLOCK_MONOTONIC, &event->data.time); + } + struct timespec self_time = event->data.time; + EventNode ** list_pos = &LAST_EVENT; + FOREACH_EVENT_DESC(other) { + struct timespec other_time = other->data.time; + if (self_time.tv_sec > other_time.tv_sec) { + break; + } + if (self_time.tv_sec == other_time.tv_sec) { + if (self_time.tv_nsec >= other_time.tv_nsec) { + break; + } + } + list_pos = &other->next->prev; + } + EventNode * prev = *list_pos; + event->next = prev->next; + event->prev = prev; + prev->next->prev = event; + prev->next = event; + return event; +} + +void +event_destroy(EventNode * self) +{ + modifier_set_destruct(&self->data.modifiers); + self->next->prev = self->prev; + self->prev->next = self->next; + self->prev = NULL; + self->next = NULL; + free(self); +} diff --git a/events.h b/events.h new file mode 100644 index 0000000..5e847d6 --- /dev/null +++ b/events.h @@ -0,0 +1,50 @@ +#ifndef EVENTS_H_ +#define EVENTS_H_ + +#include +#include "defs.h" +#include "modifiers.h" + +typedef uint32_t EventNamespace; + +typedef struct { + EventNamespace ns; + uint32_t value; +} EventCode; + +typedef struct { + EventCode code; + uint32_t ttl; + int32_t priority; + int64_t payload; + ModifierSet modifiers; + struct timespec time; +} EventData; + +typedef struct event_position_base EventPositionBase; +typedef struct event_node EventNode; + +struct event_position_base { + bool (*handle_event) (EventPositionBase * self, EventNode * event); // If returns false, the scheduler should not rewind back to the start. Must return true if any events were deleted + bool waiting_new_event; // Skip from handling until it is set to true. Assigning this position to a event should unset this flag +}; + +struct event_node { + EventNode *prev, *next; + EventPositionBase *position; + size_t input_index; + EventData data; +}; + +extern EventNode END_EVENTS; +#define FIRST_EVENT (END_EVENTS.next) +#define LAST_EVENT (END_EVENTS.prev) +#define FOREACH_EVENT(ev) for (EventNode *ev = FIRST_EVENT; ev && (ev != &END_EVENTS); ev = ev->next) +#define FOREACH_EVENT_DESC(ev) for (EventNode *ev = LAST_EVENT; ev && (ev != &END_EVENTS); ev = ev->prev) + +// Creates count replicas after the source event in the list, position is NULL, returns the number of successfully created replicas +size_t event_replicate(EventNode * source, size_t count); +EventNode * event_create(const EventData * content); +void event_destroy(EventNode * self); + +#endif /* end of include guard: EVENTS_H_ */ diff --git a/graph.c b/graph.c new file mode 100644 index 0000000..8a17aaf --- /dev/null +++ b/graph.c @@ -0,0 +1,167 @@ +#include +#include "graph.h" + +static void +graph_channel_list_resize(GraphChannelList * lst, size_t target) +{ + if (target > lst->length) { + GraphChannel **new_elements = reallocarray(lst->elements, target, sizeof(GraphChannel*)); + if (!new_elements) { + return; + } + memset(&new_elements[lst->length], 0, (target - lst->length) * sizeof(GraphChannel*)); + lst->elements = new_elements; + } + lst->length = target; +} + +inline static void +graph_channel_list_ensure(GraphChannelList * lst, size_t idx) +{ + size_t min_length = idx + 1; + if (lst->length < min_length) { + graph_channel_list_resize(lst, min_length); + } + if (lst->length < min_length) { + perror("Failed to resize graph channel list"); + exit(1); + } +} + +static bool +channel_handle_event(EventPositionBase * self, EventNode * event) +{ + GraphChannel *ch = DOWNCAST(GraphChannel, EventPositionBase, self); + if (event->data.ttl == 0) { + event_destroy(event); + return true; + } + event->data.ttl--; + if (event->data.ttl == 0) { + event_destroy(event); + return true; + } + GraphNode * target = ch->end; + if (!target) { + event_destroy(event); + return true; + } + event->position = &target->as_EventPositionBase; + event->input_index = ch->idx_end; + target->as_EventPositionBase.waiting_new_event = false; + return false; // Continue processing events +} + +void +graph_channel_init(GraphChannel * ch, GraphNode * start, size_t start_idx, GraphNode * end, size_t end_idx) +{ + if (start) { + graph_channel_list_ensure(&start->outputs, start_idx); + GraphChannel ** old_ch = &start->outputs.elements[start_idx]; + if (*old_ch) { + if ((*old_ch)->start == start && (*old_ch)->idx_start == start_idx) { + (*old_ch)->start = NULL; + // TODO on orphaned check + } + } + *old_ch = ch; + } + + if (end) { + graph_channel_list_ensure(&end->inputs, end_idx); + GraphChannel ** old_ch = &end->inputs.elements[end_idx]; + if (*old_ch) { + if ((*old_ch)->end == end && (*old_ch)->idx_end == end_idx) { + (*old_ch)->end = NULL; + // TODO on orphaned check + } + } + *old_ch = ch; + } + + ch->start = start; + ch->end = end; + ch->idx_start = start_idx; + ch->idx_end = end_idx; + ch->as_EventPositionBase.handle_event = &channel_handle_event; + ch->as_EventPositionBase.waiting_new_event = false; +} + +GraphNode * +graph_node_new(GraphNodeSpecification * spec) +{ + if (!spec || !spec->create) { + return NULL; + } + return spec->create(spec); +} + +void +graph_node_delete(GraphNode * self) +{ + if (!self) { + return; + } + + for (size_t i = 0; i < self->inputs.length; ++i) { + GraphChannel *ch = self->inputs.elements[i]; + if (ch && ch->end == self) { + ch->end = NULL; + if (!ch->start && !ch->end) { + // free(ch); // TODO on orphaned + } + } + self->inputs.elements[i] = NULL; + } + + for (size_t i = 0; i < self->outputs.length; ++i) { + GraphChannel *ch = self->outputs.elements[i]; + if (ch && ch->start == self) { + ch->start = NULL; + if (!ch->start && !ch->end) { + // free(ch); // TODO on orphaned + } + } + self->outputs.elements[i] = NULL; + } + + graph_channel_list_deinit(&self->inputs); + graph_channel_list_deinit(&self->outputs); + + GraphNodeSpecification *spec = self->specification; + if (!spec || !spec->destroy) { + return; + } + + spec->destroy(spec, self); +} + +void +graph_node_register_io(GraphNode * self, ProcessingState * state) +{ + if (!self) { + return; + } + GraphNodeSpecification *spec = self->specification; + if (!spec || !spec->register_io) { + return; + } + spec->register_io(spec, self, state); +} + +void +graph_channel_list_init(GraphChannelList * lst) +{ + lst->length = 0; + lst->elements = NULL; +} + +void +graph_channel_list_deinit(GraphChannelList * lst) +{ + if (lst->elements) { + free(lst->elements); + lst->elements = NULL; + } + lst->length = 0; +} diff --git a/graph.h b/graph.h new file mode 100644 index 0000000..3f8c5e5 --- /dev/null +++ b/graph.h @@ -0,0 +1,44 @@ +#ifndef GRAPH_H_ +#define GRAPH_H_ + +#include "events.h" +#include "processing.h" + +typedef struct graph_node GraphNode; +typedef struct graph_channel GraphChannel; +typedef struct graph_node_specification GraphNodeSpecification; + +typedef struct { + size_t length; + GraphChannel ** elements; +} GraphChannelList; + +#define EMPTY_GRAPH_CHANNEL_LIST ((GraphChannelList) {.length = 0, .elements = NULL}) + +struct graph_node { + EventPositionBase as_EventPositionBase; + GraphNodeSpecification * specification; + GraphChannelList inputs, outputs; +}; + +struct graph_channel { + EventPositionBase as_EventPositionBase; + GraphNode *start, *end; + size_t idx_start, idx_end; +}; + +struct graph_node_specification { + GraphNode * (*create)(GraphNodeSpecification * self); + void (*destroy)(GraphNodeSpecification * self, GraphNode * target); + void (*register_io)(GraphNodeSpecification * self, GraphNode * target, ProcessingState * state); + char *name; +}; + +void graph_channel_init(GraphChannel * ch, GraphNode * start, size_t start_idx, GraphNode * end, size_t end_idx); +GraphNode *graph_node_new(GraphNodeSpecification * spec); +void graph_node_delete(GraphNode * self); +void graph_node_register_io(GraphNode * self, ProcessingState * state); +void graph_channel_list_init(GraphChannelList * lst); +void graph_channel_list_deinit(GraphChannelList * lst); + +#endif /* end of include guard: GRAPH_H_ */ diff --git a/main.c b/main.c new file mode 100644 index 0000000..2832591 --- /dev/null +++ b/main.c @@ -0,0 +1,35 @@ +#include "processing.h" +#include "nodes/print.h" +#include "nodes/getchar.h" + +int +main(int argc, char ** argv) +{ + (void)argc; + (void)argv; + + ProcessingState state = (ProcessingState) { + .wait_delay = NULL, + }; + clock_gettime(CLOCK_MONOTONIC, &state.reached_time); + io_subscription_list_init(&state.wait_input, 5); + io_subscription_list_init(&state.wait_output, 5); + + GraphNode * input_node = graph_node_new(&nodespec_getchar); + GraphNode * output_node = graph_node_new(&nodespec_print); + GraphChannel chan; + graph_channel_init(&chan, input_node, 0, output_node, 0); + graph_node_register_io(input_node, &state); + graph_node_register_io(output_node, &state); + + while (true) { + process_iteration(&state); + } + + graph_node_delete(output_node); + graph_node_delete(input_node); + + io_subscription_list_deinit(&state.wait_output); + io_subscription_list_deinit(&state.wait_input); + return 0; +} diff --git a/modifiers.h b/modifiers.h new file mode 100644 index 0000000..58d7f52 --- /dev/null +++ b/modifiers.h @@ -0,0 +1,78 @@ +#ifndef MODIFIERS_H_ +#define MODIFIERS_H_ + +#include "defs.h" +#include + +typedef struct { + size_t byte_length; + uint8_t *bits; +} ModifierSet; + +#define EMPTY_MODIFIER_SET ((ModifierSet) {.byte_length = 0, .bits = NULL}) + +__attribute__((unused)) inline static ModifierSet +modifier_set_copy(const ModifierSet old) +{ + ModifierSet result = old; + result.bits = malloc(result.byte_length); + if (!result.bits) { + result.byte_length = 0; + return result; + } + memcpy(result.bits, old.bits, result.byte_length); + return result; +} + +__attribute__((unused)) inline static void +modifier_set_destruct(ModifierSet * old) +{ + if (old->bits) { + free(old->bits); + } + old->bits = NULL; + old->byte_length = 0; +} + +__attribute__((unused)) inline static bool +modifier_set_extend(ModifierSet * old, size_t new_byte_length) +{ + if (new_byte_length > old->byte_length) { + uint8_t *bits = realloc(old->bits, new_byte_length); + if (!bits) { + return false; + } + memset(bits + old->byte_length, 0, new_byte_length - old->byte_length); + old->bits = bits; + old->byte_length = new_byte_length; + } + return true; +} + +__attribute__((unused)) inline static void +modifier_set_set_from(ModifierSet * target, const ModifierSet source) +{ + modifier_set_extend(target, source.byte_length); + for (size_t i = 0; i < target->byte_length; ++i) { + target->bits[i] |= source.bits[i]; + } +} + +__attribute__((unused)) inline static void +modifier_set_unset_from(ModifierSet * target, const ModifierSet source) +{ + for (size_t i = 0; i < target->byte_length; ++i) { + target->bits[i] &= ~source.bits[i]; + } +} + +__attribute__((unused)) inline static void +modifier_set_toggle_from(ModifierSet * target, const ModifierSet source) +{ + modifier_set_extend(target, source.byte_length); + for (size_t i = 0; i < target->byte_length; ++i) { + target->bits[i] ^= source.bits[i]; + } +} + +#endif /* end of include guard: MODIFIERS_H_ */ diff --git a/nodes/getchar.c b/nodes/getchar.c new file mode 100644 index 0000000..f738d11 --- /dev/null +++ b/nodes/getchar.c @@ -0,0 +1,94 @@ +#include +#include +#include "getchar.h" +#include "../processing.h" + +typedef struct { + GraphNode as_GraphNode; + IOHandling subscription; +} GetcharGraphNode; + +static void +handle_io(EventPositionBase * self, int fd, bool is_output) +{ + (void) is_output; + GetcharGraphNode * node = DOWNCAST(GetcharGraphNode, GraphNode, DOWNCAST(GraphNode, EventPositionBase, self)); + char buf[1]; + ssize_t status = read(fd, buf, 1); + if (status < 0) { + perror("Failed to read character"); + return; + } + EventData data = { + .code = { + .ns = 0, + .value = 1, + }, + .ttl = 100, + .priority = 10, + .payload = buf[0], + .modifiers = EMPTY_MODIFIER_SET, + }; + clock_gettime(CLOCK_MONOTONIC, &data.time); + if (status == 0) { + node->subscription.enabled = false; + data.code.value = 2; + data.payload = 0; + } + for (size_t i = 0; i < node->as_GraphNode.outputs.length; ++i) { + EventNode *ev = event_create(&data); + if (!ev) { + perror("Failed to create event"); + break; + } + ev->position = &node->as_GraphNode.outputs.elements[i]->as_EventPositionBase; + } +} + +static GraphNode * +create(GraphNodeSpecification * spec) +{ + GetcharGraphNode * node = calloc(1, sizeof(GetcharGraphNode)); + if (!node) { + return NULL; + } + *node = (GetcharGraphNode) { + .as_GraphNode = { + .as_EventPositionBase = { + .handle_event = NULL, + .waiting_new_event = false, + }, + .specification = spec, + .inputs = EMPTY_GRAPH_CHANNEL_LIST, + .outputs = EMPTY_GRAPH_CHANNEL_LIST, + }, + .subscription = { + .self = &node->as_GraphNode.as_EventPositionBase, + .handle_io = handle_io, + .enabled = true, + }, + }; + return &node->as_GraphNode; +} + +static void destroy +(GraphNodeSpecification * self, GraphNode * target) +{ + (void) self; + free(target); +} + +static void +register_io(GraphNodeSpecification * self, GraphNode * target, ProcessingState * state) +{ + (void) self; + GetcharGraphNode * node = DOWNCAST(GetcharGraphNode, GraphNode, target); + io_subscription_list_add(&state->wait_input, fileno(stdin), &node->subscription); +} + +GraphNodeSpecification nodespec_getchar = (GraphNodeSpecification) { + .create = &create, + .destroy = &destroy, + .register_io = ®ister_io, + .name = "getchar", +}; diff --git a/nodes/getchar.h b/nodes/getchar.h new file mode 100644 index 0000000..d25852f --- /dev/null +++ b/nodes/getchar.h @@ -0,0 +1,8 @@ +#ifndef NODES_GETCHAR_H_ +#define NODES_GETCHAR_H_ + +#include "../graph.h" + +extern GraphNodeSpecification nodespec_getchar; + +#endif /* end of include guard: NODES_GETCHAR_H_ */ diff --git a/nodes/print.c b/nodes/print.c new file mode 100644 index 0000000..88cb9ac --- /dev/null +++ b/nodes/print.c @@ -0,0 +1,59 @@ +#include +#include "print.h" + +static bool +handle_event(EventPositionBase * self, EventNode * event) +{ +#define PRINT_FIELD(fmt, path) printf("%s = " fmt "\n", #path, data.path) + (void) self; + EventData data = event->data; + printf("Event from connector %ld:\n", event->input_index); + PRINT_FIELD("%d", code.ns); + PRINT_FIELD("%d", code.value); + PRINT_FIELD("%d", ttl); + PRINT_FIELD("%d", priority); + PRINT_FIELD("%ld", payload); + printf("modifiers = "); + for (ssize_t i = data.modifiers.byte_length - 1; i >= 0; --i) { + printf("%02x", data.modifiers.bits[i]); + } + printf("\n"); + printf("time = %ld.%09ld\n", data.time.tv_sec, data.time.tv_nsec); + printf("---\n\n"); + event_destroy(event); + return true; +#undef PRINT_FIELD +} + +static GraphNode * +create(GraphNodeSpecification * spec) +{ + GraphNode * node = calloc(1, sizeof(GraphNode)); + if (!node) { + return node; + } + *node = (GraphNode) { + .as_EventPositionBase = { + .handle_event = &handle_event, + .waiting_new_event = false, + }, + .specification = spec, + .inputs = EMPTY_GRAPH_CHANNEL_LIST, + .outputs = EMPTY_GRAPH_CHANNEL_LIST, + }; + return node; +} + +static void destroy +(GraphNodeSpecification * self, GraphNode * target) +{ + (void) self; + free(target); +} + +GraphNodeSpecification nodespec_print = (GraphNodeSpecification) { + .create = &create, + .destroy = &destroy, + .register_io = NULL, + .name = "print", +}; diff --git a/nodes/print.h b/nodes/print.h new file mode 100644 index 0000000..e90c49e --- /dev/null +++ b/nodes/print.h @@ -0,0 +1,8 @@ +#ifndef NODES_PRINT_H_ +#define NODES_PRINT_H_ + +#include "../graph.h" + +extern GraphNodeSpecification nodespec_print; + +#endif /* end of include guard: NODES_PRINT_H_ */ diff --git a/processing.c b/processing.c new file mode 100644 index 0000000..734195e --- /dev/null +++ b/processing.c @@ -0,0 +1,361 @@ +#include +#include +#include +#include +#include "processing.h" + +static bool +io_subscription_list_extend(IOSubscriptionList * lst) +{ + size_t capacity = lst->capacity; + capacity = capacity + (capacity >> 1) + 1; + + int * new_fds = reallocarray(lst->fds, capacity, sizeof(int)); + if (!new_fds) { + return false; + } + lst->fds = new_fds; + + IOHandling ** new_subscribers = reallocarray(lst->subscribers, capacity, sizeof(IOHandling*)); + if (!new_subscribers) { + return false; + } + lst->subscribers = new_subscribers; + + lst->capacity = capacity; + return true; +} + +void +io_subscription_list_init(IOSubscriptionList * lst, size_t capacity) +{ + IOSubscriptionList result = { + .length = 0, + .capacity = 0, + .fds = NULL, + .subscribers = NULL, + }; + result.fds = calloc(capacity, sizeof(int)); + result.subscribers = calloc(capacity, sizeof(IOHandling*)); + if (!result.fds || !result.subscribers) + capacity = 0; + result.capacity = capacity; + *lst = result; +} + +void +io_subscription_list_deinit(IOSubscriptionList * lst) +{ + if (lst->fds) + free(lst->fds); + if (lst->subscribers) + free(lst->subscribers); + *lst = (IOSubscriptionList) { + .length = 0, + .capacity = 0, + .fds = NULL, + .subscribers = NULL, + }; +} + +void +io_subscription_list_add(IOSubscriptionList * lst, int fd, IOHandling *subscriber) +{ + if (lst->length >= lst->capacity) { + if (!io_subscription_list_extend(lst)) { + perror("Failed to extend io subscription list"); + exit(1); + } + } + assert(lst->length < lst->capacity); + size_t i = lst->length; + lst->fds[i] = fd; + lst->subscribers[i] = subscriber; + lst->length = i + 1; +} + +static int +populate_fd_set(fd_set * fds, IOSubscriptionList * src, int old_max_fd) +{ + FD_ZERO(fds); + for (size_t i = 0; i < src->length; ++i) { + IOHandling *subscriber = src->subscribers[i]; + if (!subscriber) { + continue; + } + if (!subscriber->enabled) { + continue; + } + int fd = src->fds[i]; + if (fd > old_max_fd) { + old_max_fd = fd; + } + FD_SET(fd, fds); + } + return old_max_fd; +} + +static void +run_io_handlers(fd_set * fds, IOSubscriptionList * subs, bool arg) +{ + for (size_t i = 0; i < subs->length; ++i) { + int fd = subs->fds[i]; + if (FD_ISSET(fd, fds)) { + IOHandling *subscriber = subs->subscribers[i]; + if (!subscriber) { + continue; + } + if (!subscriber->enabled) { + continue; + } + void (*callback) (EventPositionBase*, int, bool) = subscriber->handle_io; + if (callback) { + callback(subscriber->self, fd, arg); + } + } + } +} + +bool +process_io(ProcessingState * state, const struct timespec * timeout) +{ + int max_fd = 0; + fd_set readfds, writefds; + + max_fd = populate_fd_set(&readfds, &state->wait_input, max_fd); + max_fd = populate_fd_set(&writefds, &state->wait_output, max_fd); + + ++max_fd; + int ready = pselect(max_fd, &readfds, &writefds, NULL, timeout, NULL); + + if (ready < 0) { + FD_ZERO(&readfds); + FD_ZERO(&writefds); + return false; + } + + run_io_handlers(&readfds, &state->wait_input, false); + run_io_handlers(&writefds, &state->wait_output, true); + + FD_ZERO(&readfds); + FD_ZERO(&writefds); + return true; +} + +bool +schedule_delay(ProcessingState * state, EventPositionBase * target, void (*callback) (EventPositionBase*, void*, const struct timespec*), const struct timespec * time) +{ + DelayList **next = &state->wait_delay; + while (*next) { + struct timespec next_time = (*next)->time; + if (next_time.tv_sec > time->tv_sec) { + break; + } + if (next_time.tv_sec == time->tv_sec) { + if (next_time.tv_nsec > time->tv_nsec) { + break; + } + } + next = &((*next)->next); + } + + DelayList * current = malloc(sizeof(DelayList)); + if (!current) { + return false; + } + + *current = (DelayList) { + .callback = callback, + .target = target, + .next = *next, + .time = *time, + }; + *next = current; + return true; +} + +static const struct timespec ZERO_TS = { + .tv_sec = 0, + .tv_nsec = 0, +}; + +inline static void +fix_nsec(struct timespec * ts) +{ + if (ts->tv_nsec < 0) { + ts->tv_nsec += 1000000000; + ts->tv_sec -= 1; + } +} + +static bool +process_single_scheduled(ProcessingState * state, const struct timespec extern_time) +{ + if (!state->wait_delay) { + return false; + } + struct timespec next_scheduled_time = state->wait_delay->time; // abs + if (next_scheduled_time.tv_sec > extern_time.tv_sec) { + return false; + } else if (next_scheduled_time.tv_sec == extern_time.tv_sec) { + if (next_scheduled_time.tv_nsec > extern_time.tv_nsec) { + return false; + } + } + DelayList next_scheduled = *state->wait_delay; + free(state->wait_delay); + state->wait_delay = next_scheduled.next; + + if (next_scheduled.callback) { + next_scheduled.callback( + next_scheduled.target, + next_scheduled.closure, + &next_scheduled_time + ); + } + return true; +} + +static bool +process_events_until(ProcessingState * state, const struct timespec * max_time) +{ + bool stable = true; + int32_t next_priority = INT32_MIN; + state->has_future_events = false; + + FOREACH_EVENT(ev) { + if (max_time) { + struct timespec ev_time = ev->data.time; + if (ev_time.tv_sec > max_time->tv_sec) { + // stable = false; + state->has_future_events = true; + break; + } else if (ev_time.tv_sec == max_time->tv_sec) { + if (ev_time.tv_nsec > max_time->tv_nsec) { + // stable = false; + state->has_future_events = true; + break; + } + } + } + + if (ev->data.priority > next_priority) { + next_priority = ev->data.priority; + } + } + + while (next_priority > INT32_MIN) { + state->pass_priority = next_priority; + next_priority = INT32_MIN; + FOREACH_EVENT(ev) { + int32_t ev_priority = ev->data.priority; + if (ev_priority < state->pass_priority) { + if (ev_priority > next_priority) { + next_priority = ev_priority; + } + } else if (ev_priority > state->pass_priority) { + continue; + } + + EventPositionBase *position = ev->position; + if (!position) { + continue; + } + if (position->waiting_new_event) { + continue; + } + bool (*handler) (EventPositionBase*, EventNode*) = position->handle_event; + if (!handler) { + continue; + } + + if (max_time) { + struct timespec ev_time = ev->data.time; + if (ev_time.tv_sec > max_time->tv_sec) { + state->has_future_events = true; + break; + } else if (ev_time.tv_sec == max_time->tv_sec) { + if (ev_time.tv_nsec > max_time->tv_nsec) { + state->has_future_events = true; + break; + } + } + } + + stable = false; + bool should_rewind = handler(position, ev); + if (should_rewind) { + // ev = &END_EVENTS; // Will be set to FIRST_EVENT by loop increment + next_priority = INT32_MIN; // Break out of the outermost loop + break; + } + } + } + + state->reached_time = *max_time; + FOREACH_EVENT(ev) { + state->reached_time = ev->data.time; + break; + } + + return !stable; +} + +void +process_iteration(ProcessingState * state) +{ + struct timespec extern_time; + if (clock_gettime(CLOCK_MONOTONIC, &extern_time) < 0) { + perror("Failed to get time"); + exit(1); + } + + // late_by.tv_sec = extern_time.tv_sec - state->reached_time.tv_sec; + // late_by.tv_nsec = extern_time.tv_nsec - state->reached_time.tv_nsec; + // fix_nsec(&late_by); + + struct timespec next_scheduled_delay; + const struct timespec *max_io_timeout = NULL; + if (state->has_future_events) { + max_io_timeout = &ZERO_TS; + } else { + if (state->wait_delay) { + next_scheduled_delay = state->wait_delay->time; // abs + next_scheduled_delay.tv_sec -= extern_time.tv_sec; + next_scheduled_delay.tv_nsec -= extern_time.tv_nsec; + fix_nsec(&next_scheduled_delay); + if (next_scheduled_delay.tv_sec < 0) { + max_io_timeout = &ZERO_TS; + } else { + max_io_timeout = &next_scheduled_delay; + } + } + } + + // FIXME reason about timeouts + process_io(state, max_io_timeout); + // process_io(state, &ZERO_TS); + + while (true) { + bool had_scheduled = process_single_scheduled(state, extern_time); + const struct timespec *max_event_time = &extern_time; + if (state->wait_delay) { + struct timespec next_scheduled_time = state->wait_delay->time; + bool use_scheduled = false; + if (!use_scheduled) { + use_scheduled = next_scheduled_time.tv_sec > extern_time.tv_sec; + } + if (!use_scheduled) { + use_scheduled = next_scheduled_time.tv_nsec > extern_time.tv_nsec; + } + if (use_scheduled) { + max_event_time = &state->wait_delay->time; + } + } + bool had_events = process_events_until(state, max_event_time); + if (!had_scheduled && !had_events) { + break; + } + process_io(state, &ZERO_TS); + } +} diff --git a/processing.h b/processing.h new file mode 100644 index 0000000..8452dc8 --- /dev/null +++ b/processing.h @@ -0,0 +1,49 @@ +#ifndef PROCESSING_H_ +#define PROCESSING_H_ + +#include +#include "events.h" + +typedef struct io_handling IOHandling; + +// no virtual multiinheritance +struct io_handling { + EventPositionBase * self; + void (*handle_io) (EventPositionBase * self, int fd, bool is_output); + bool enabled; +}; + +typedef struct { + size_t length; + size_t capacity; + int *fds; + IOHandling **subscribers; +} IOSubscriptionList; + +typedef struct delay_list DelayList; + +struct delay_list { + void (*callback) (EventPositionBase * target, void * closure, const struct timespec * time); + EventPositionBase *target; + void *closure; + DelayList *next; + struct timespec time; +}; + +typedef struct { + IOSubscriptionList wait_input, wait_output; + DelayList *wait_delay; + struct timespec reached_time; + int32_t pass_priority; + bool has_future_events; +} ProcessingState; + +void io_subscription_list_init(IOSubscriptionList * lst, size_t capacity); +void io_subscription_list_deinit(IOSubscriptionList * lst); +void io_subscription_list_add(IOSubscriptionList * lst, int fd, IOHandling *subscriber); + +bool schedule_delay(ProcessingState * state, EventPositionBase * target, void (*callback) (EventPositionBase*, void*, const struct timespec*), const struct timespec * time); +bool process_io(ProcessingState * state, const struct timespec * timeout); +void process_iteration(ProcessingState * state); + +#endif /* end of include guard: PROCESSING_H_ */