commit d0bcd6b20c978fe9e98ec2d52e69147832a37af3 Author: Vftdan Date: Tue Aug 13 23:33:52 2024 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f0c9b81 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.o +main diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..07c0e22 --- /dev/null +++ b/Makefile @@ -0,0 +1,23 @@ +DEPS = +CFLAGS = -Wall -Wextra +ifdef DEBUG + CFLAGS += -Og -gdwarf-2 + CPPFLAGS += -D DEBUG +else + CFLAGS += -O2 +endif +CPPFLAGS += $(shell pkg-config --cflags $(DEPS)) +LDLIBS += $(shell pkg-config --libs $(DEPS)) +INTERP ?= +MAIN = main +OBJS = main.o events.o processing.o graph.o nodes/getchar.o nodes/print.o + +all: $(MAIN) + +run: $(MAIN) + $(INTERP) ./$(MAIN) + +.PHONY: all run + +$(MAIN): $(OBJS) + $(CC) $(LDFLAGS) $^ $(LOADLIBES) $(LDLIBS) -o $@ diff --git a/defs.h b/defs.h new file mode 100644 index 0000000..b47a6c7 --- /dev/null +++ b/defs.h @@ -0,0 +1,16 @@ +#ifndef DEFS_H_ +#define DEFS_H_ + +#include +#include +#include +#include + +#define containerof(ptr, contype, membpath) ((contype*)(0 ? (void)(((contype*)NULL)->membpath = *(ptr)) : (void)0, ((char *)(ptr)) - offsetof(contype, membpath))) +// Assuming child type has a field for the base type +// So for structs it is usually actual downcast, but for unions it is an upcast +#define DOWNCAST(contype, basename, ptr) containerof(ptr, contype, as_##basename) + +#define DEBUG_PRINT_VALUE(x, fmt) fprintf(stderr, #x " = " fmt "\n", x); fflush(stderr) + +#endif /* end of include guard: DEFS_H_ */ diff --git a/events.c b/events.c new file mode 100644 index 0000000..5190349 --- /dev/null +++ b/events.c @@ -0,0 +1,73 @@ +#include "events.h" + +EventNode +END_EVENTS = { + .prev = &END_EVENTS, + .next = &END_EVENTS, + .position = NULL, + .input_index = 0, +}; + +size_t +event_replicate(EventNode * source, size_t count) +{ + size_t i; + for (i = 0; i < count; ++i) { + EventNode * replica = malloc(sizeof(EventNode)); + if (!replica) { + break; + } + replica->position = NULL; + replica->input_index = 0; + replica->data = source->data; + replica->data.modifiers = modifier_set_copy(source->data.modifiers); + replica->prev = source; + replica->next = source->next; + source->next->next->prev = replica; + source->next = replica; + } + return i; +} + +EventNode * +event_create(const EventData * content) +{ + EventNode * event = calloc(1, sizeof(EventNode)); + if (content) { + event->data = *content; + event->data.modifiers = modifier_set_copy(content->modifiers); + } else { + clock_gettime(CLOCK_MONOTONIC, &event->data.time); + } + struct timespec self_time = event->data.time; + EventNode ** list_pos = &LAST_EVENT; + FOREACH_EVENT_DESC(other) { + struct timespec other_time = other->data.time; + if (self_time.tv_sec > other_time.tv_sec) { + break; + } + if (self_time.tv_sec == other_time.tv_sec) { + if (self_time.tv_nsec >= other_time.tv_nsec) { + break; + } + } + list_pos = &other->next->prev; + } + EventNode * prev = *list_pos; + event->next = prev->next; + event->prev = prev; + prev->next->prev = event; + prev->next = event; + return event; +} + +void +event_destroy(EventNode * self) +{ + modifier_set_destruct(&self->data.modifiers); + self->next->prev = self->prev; + self->prev->next = self->next; + self->prev = NULL; + self->next = NULL; + free(self); +} diff --git a/events.h b/events.h new file mode 100644 index 0000000..5e847d6 --- /dev/null +++ b/events.h @@ -0,0 +1,50 @@ +#ifndef EVENTS_H_ +#define EVENTS_H_ + +#include +#include "defs.h" +#include "modifiers.h" + +typedef uint32_t EventNamespace; + +typedef struct { + EventNamespace ns; + uint32_t value; +} EventCode; + +typedef struct { + EventCode code; + uint32_t ttl; + int32_t priority; + int64_t payload; + ModifierSet modifiers; + struct timespec time; +} EventData; + +typedef struct event_position_base EventPositionBase; +typedef struct event_node EventNode; + +struct event_position_base { + bool (*handle_event) (EventPositionBase * self, EventNode * event); // If returns false, the scheduler should not rewind back to the start. Must return true if any events were deleted + bool waiting_new_event; // Skip from handling until it is set to true. Assigning this position to a event should unset this flag +}; + +struct event_node { + EventNode *prev, *next; + EventPositionBase *position; + size_t input_index; + EventData data; +}; + +extern EventNode END_EVENTS; +#define FIRST_EVENT (END_EVENTS.next) +#define LAST_EVENT (END_EVENTS.prev) +#define FOREACH_EVENT(ev) for (EventNode *ev = FIRST_EVENT; ev && (ev != &END_EVENTS); ev = ev->next) +#define FOREACH_EVENT_DESC(ev) for (EventNode *ev = LAST_EVENT; ev && (ev != &END_EVENTS); ev = ev->prev) + +// Creates count replicas after the source event in the list, position is NULL, returns the number of successfully created replicas +size_t event_replicate(EventNode * source, size_t count); +EventNode * event_create(const EventData * content); +void event_destroy(EventNode * self); + +#endif /* end of include guard: EVENTS_H_ */ diff --git a/graph.c b/graph.c new file mode 100644 index 0000000..8a17aaf --- /dev/null +++ b/graph.c @@ -0,0 +1,167 @@ +#include +#include "graph.h" + +static void +graph_channel_list_resize(GraphChannelList * lst, size_t target) +{ + if (target > lst->length) { + GraphChannel **new_elements = reallocarray(lst->elements, target, sizeof(GraphChannel*)); + if (!new_elements) { + return; + } + memset(&new_elements[lst->length], 0, (target - lst->length) * sizeof(GraphChannel*)); + lst->elements = new_elements; + } + lst->length = target; +} + +inline static void +graph_channel_list_ensure(GraphChannelList * lst, size_t idx) +{ + size_t min_length = idx + 1; + if (lst->length < min_length) { + graph_channel_list_resize(lst, min_length); + } + if (lst->length < min_length) { + perror("Failed to resize graph channel list"); + exit(1); + } +} + +static bool +channel_handle_event(EventPositionBase * self, EventNode * event) +{ + GraphChannel *ch = DOWNCAST(GraphChannel, EventPositionBase, self); + if (event->data.ttl == 0) { + event_destroy(event); + return true; + } + event->data.ttl--; + if (event->data.ttl == 0) { + event_destroy(event); + return true; + } + GraphNode * target = ch->end; + if (!target) { + event_destroy(event); + return true; + } + event->position = &target->as_EventPositionBase; + event->input_index = ch->idx_end; + target->as_EventPositionBase.waiting_new_event = false; + return false; // Continue processing events +} + +void +graph_channel_init(GraphChannel * ch, GraphNode * start, size_t start_idx, GraphNode * end, size_t end_idx) +{ + if (start) { + graph_channel_list_ensure(&start->outputs, start_idx); + GraphChannel ** old_ch = &start->outputs.elements[start_idx]; + if (*old_ch) { + if ((*old_ch)->start == start && (*old_ch)->idx_start == start_idx) { + (*old_ch)->start = NULL; + // TODO on orphaned check + } + } + *old_ch = ch; + } + + if (end) { + graph_channel_list_ensure(&end->inputs, end_idx); + GraphChannel ** old_ch = &end->inputs.elements[end_idx]; + if (*old_ch) { + if ((*old_ch)->end == end && (*old_ch)->idx_end == end_idx) { + (*old_ch)->end = NULL; + // TODO on orphaned check + } + } + *old_ch = ch; + } + + ch->start = start; + ch->end = end; + ch->idx_start = start_idx; + ch->idx_end = end_idx; + ch->as_EventPositionBase.handle_event = &channel_handle_event; + ch->as_EventPositionBase.waiting_new_event = false; +} + +GraphNode * +graph_node_new(GraphNodeSpecification * spec) +{ + if (!spec || !spec->create) { + return NULL; + } + return spec->create(spec); +} + +void +graph_node_delete(GraphNode * self) +{ + if (!self) { + return; + } + + for (size_t i = 0; i < self->inputs.length; ++i) { + GraphChannel *ch = self->inputs.elements[i]; + if (ch && ch->end == self) { + ch->end = NULL; + if (!ch->start && !ch->end) { + // free(ch); // TODO on orphaned + } + } + self->inputs.elements[i] = NULL; + } + + for (size_t i = 0; i < self->outputs.length; ++i) { + GraphChannel *ch = self->outputs.elements[i]; + if (ch && ch->start == self) { + ch->start = NULL; + if (!ch->start && !ch->end) { + // free(ch); // TODO on orphaned + } + } + self->outputs.elements[i] = NULL; + } + + graph_channel_list_deinit(&self->inputs); + graph_channel_list_deinit(&self->outputs); + + GraphNodeSpecification *spec = self->specification; + if (!spec || !spec->destroy) { + return; + } + + spec->destroy(spec, self); +} + +void +graph_node_register_io(GraphNode * self, ProcessingState * state) +{ + if (!self) { + return; + } + GraphNodeSpecification *spec = self->specification; + if (!spec || !spec->register_io) { + return; + } + spec->register_io(spec, self, state); +} + +void +graph_channel_list_init(GraphChannelList * lst) +{ + lst->length = 0; + lst->elements = NULL; +} + +void +graph_channel_list_deinit(GraphChannelList * lst) +{ + if (lst->elements) { + free(lst->elements); + lst->elements = NULL; + } + lst->length = 0; +} diff --git a/graph.h b/graph.h new file mode 100644 index 0000000..3f8c5e5 --- /dev/null +++ b/graph.h @@ -0,0 +1,44 @@ +#ifndef GRAPH_H_ +#define GRAPH_H_ + +#include "events.h" +#include "processing.h" + +typedef struct graph_node GraphNode; +typedef struct graph_channel GraphChannel; +typedef struct graph_node_specification GraphNodeSpecification; + +typedef struct { + size_t length; + GraphChannel ** elements; +} GraphChannelList; + +#define EMPTY_GRAPH_CHANNEL_LIST ((GraphChannelList) {.length = 0, .elements = NULL}) + +struct graph_node { + EventPositionBase as_EventPositionBase; + GraphNodeSpecification * specification; + GraphChannelList inputs, outputs; +}; + +struct graph_channel { + EventPositionBase as_EventPositionBase; + GraphNode *start, *end; + size_t idx_start, idx_end; +}; + +struct graph_node_specification { + GraphNode * (*create)(GraphNodeSpecification * self); + void (*destroy)(GraphNodeSpecification * self, GraphNode * target); + void (*register_io)(GraphNodeSpecification * self, GraphNode * target, ProcessingState * state); + char *name; +}; + +void graph_channel_init(GraphChannel * ch, GraphNode * start, size_t start_idx, GraphNode * end, size_t end_idx); +GraphNode *graph_node_new(GraphNodeSpecification * spec); +void graph_node_delete(GraphNode * self); +void graph_node_register_io(GraphNode * self, ProcessingState * state); +void graph_channel_list_init(GraphChannelList * lst); +void graph_channel_list_deinit(GraphChannelList * lst); + +#endif /* end of include guard: GRAPH_H_ */ diff --git a/main.c b/main.c new file mode 100644 index 0000000..2832591 --- /dev/null +++ b/main.c @@ -0,0 +1,35 @@ +#include "processing.h" +#include "nodes/print.h" +#include "nodes/getchar.h" + +int +main(int argc, char ** argv) +{ + (void)argc; + (void)argv; + + ProcessingState state = (ProcessingState) { + .wait_delay = NULL, + }; + clock_gettime(CLOCK_MONOTONIC, &state.reached_time); + io_subscription_list_init(&state.wait_input, 5); + io_subscription_list_init(&state.wait_output, 5); + + GraphNode * input_node = graph_node_new(&nodespec_getchar); + GraphNode * output_node = graph_node_new(&nodespec_print); + GraphChannel chan; + graph_channel_init(&chan, input_node, 0, output_node, 0); + graph_node_register_io(input_node, &state); + graph_node_register_io(output_node, &state); + + while (true) { + process_iteration(&state); + } + + graph_node_delete(output_node); + graph_node_delete(input_node); + + io_subscription_list_deinit(&state.wait_output); + io_subscription_list_deinit(&state.wait_input); + return 0; +} diff --git a/modifiers.h b/modifiers.h new file mode 100644 index 0000000..58d7f52 --- /dev/null +++ b/modifiers.h @@ -0,0 +1,78 @@ +#ifndef MODIFIERS_H_ +#define MODIFIERS_H_ + +#include "defs.h" +#include + +typedef struct { + size_t byte_length; + uint8_t *bits; +} ModifierSet; + +#define EMPTY_MODIFIER_SET ((ModifierSet) {.byte_length = 0, .bits = NULL}) + +__attribute__((unused)) inline static ModifierSet +modifier_set_copy(const ModifierSet old) +{ + ModifierSet result = old; + result.bits = malloc(result.byte_length); + if (!result.bits) { + result.byte_length = 0; + return result; + } + memcpy(result.bits, old.bits, result.byte_length); + return result; +} + +__attribute__((unused)) inline static void +modifier_set_destruct(ModifierSet * old) +{ + if (old->bits) { + free(old->bits); + } + old->bits = NULL; + old->byte_length = 0; +} + +__attribute__((unused)) inline static bool +modifier_set_extend(ModifierSet * old, size_t new_byte_length) +{ + if (new_byte_length > old->byte_length) { + uint8_t *bits = realloc(old->bits, new_byte_length); + if (!bits) { + return false; + } + memset(bits + old->byte_length, 0, new_byte_length - old->byte_length); + old->bits = bits; + old->byte_length = new_byte_length; + } + return true; +} + +__attribute__((unused)) inline static void +modifier_set_set_from(ModifierSet * target, const ModifierSet source) +{ + modifier_set_extend(target, source.byte_length); + for (size_t i = 0; i < target->byte_length; ++i) { + target->bits[i] |= source.bits[i]; + } +} + +__attribute__((unused)) inline static void +modifier_set_unset_from(ModifierSet * target, const ModifierSet source) +{ + for (size_t i = 0; i < target->byte_length; ++i) { + target->bits[i] &= ~source.bits[i]; + } +} + +__attribute__((unused)) inline static void +modifier_set_toggle_from(ModifierSet * target, const ModifierSet source) +{ + modifier_set_extend(target, source.byte_length); + for (size_t i = 0; i < target->byte_length; ++i) { + target->bits[i] ^= source.bits[i]; + } +} + +#endif /* end of include guard: MODIFIERS_H_ */ diff --git a/nodes/getchar.c b/nodes/getchar.c new file mode 100644 index 0000000..f738d11 --- /dev/null +++ b/nodes/getchar.c @@ -0,0 +1,94 @@ +#include +#include +#include "getchar.h" +#include "../processing.h" + +typedef struct { + GraphNode as_GraphNode; + IOHandling subscription; +} GetcharGraphNode; + +static void +handle_io(EventPositionBase * self, int fd, bool is_output) +{ + (void) is_output; + GetcharGraphNode * node = DOWNCAST(GetcharGraphNode, GraphNode, DOWNCAST(GraphNode, EventPositionBase, self)); + char buf[1]; + ssize_t status = read(fd, buf, 1); + if (status < 0) { + perror("Failed to read character"); + return; + } + EventData data = { + .code = { + .ns = 0, + .value = 1, + }, + .ttl = 100, + .priority = 10, + .payload = buf[0], + .modifiers = EMPTY_MODIFIER_SET, + }; + clock_gettime(CLOCK_MONOTONIC, &data.time); + if (status == 0) { + node->subscription.enabled = false; + data.code.value = 2; + data.payload = 0; + } + for (size_t i = 0; i < node->as_GraphNode.outputs.length; ++i) { + EventNode *ev = event_create(&data); + if (!ev) { + perror("Failed to create event"); + break; + } + ev->position = &node->as_GraphNode.outputs.elements[i]->as_EventPositionBase; + } +} + +static GraphNode * +create(GraphNodeSpecification * spec) +{ + GetcharGraphNode * node = calloc(1, sizeof(GetcharGraphNode)); + if (!node) { + return NULL; + } + *node = (GetcharGraphNode) { + .as_GraphNode = { + .as_EventPositionBase = { + .handle_event = NULL, + .waiting_new_event = false, + }, + .specification = spec, + .inputs = EMPTY_GRAPH_CHANNEL_LIST, + .outputs = EMPTY_GRAPH_CHANNEL_LIST, + }, + .subscription = { + .self = &node->as_GraphNode.as_EventPositionBase, + .handle_io = handle_io, + .enabled = true, + }, + }; + return &node->as_GraphNode; +} + +static void destroy +(GraphNodeSpecification * self, GraphNode * target) +{ + (void) self; + free(target); +} + +static void +register_io(GraphNodeSpecification * self, GraphNode * target, ProcessingState * state) +{ + (void) self; + GetcharGraphNode * node = DOWNCAST(GetcharGraphNode, GraphNode, target); + io_subscription_list_add(&state->wait_input, fileno(stdin), &node->subscription); +} + +GraphNodeSpecification nodespec_getchar = (GraphNodeSpecification) { + .create = &create, + .destroy = &destroy, + .register_io = ®ister_io, + .name = "getchar", +}; diff --git a/nodes/getchar.h b/nodes/getchar.h new file mode 100644 index 0000000..d25852f --- /dev/null +++ b/nodes/getchar.h @@ -0,0 +1,8 @@ +#ifndef NODES_GETCHAR_H_ +#define NODES_GETCHAR_H_ + +#include "../graph.h" + +extern GraphNodeSpecification nodespec_getchar; + +#endif /* end of include guard: NODES_GETCHAR_H_ */ diff --git a/nodes/print.c b/nodes/print.c new file mode 100644 index 0000000..88cb9ac --- /dev/null +++ b/nodes/print.c @@ -0,0 +1,59 @@ +#include +#include "print.h" + +static bool +handle_event(EventPositionBase * self, EventNode * event) +{ +#define PRINT_FIELD(fmt, path) printf("%s = " fmt "\n", #path, data.path) + (void) self; + EventData data = event->data; + printf("Event from connector %ld:\n", event->input_index); + PRINT_FIELD("%d", code.ns); + PRINT_FIELD("%d", code.value); + PRINT_FIELD("%d", ttl); + PRINT_FIELD("%d", priority); + PRINT_FIELD("%ld", payload); + printf("modifiers = "); + for (ssize_t i = data.modifiers.byte_length - 1; i >= 0; --i) { + printf("%02x", data.modifiers.bits[i]); + } + printf("\n"); + printf("time = %ld.%09ld\n", data.time.tv_sec, data.time.tv_nsec); + printf("---\n\n"); + event_destroy(event); + return true; +#undef PRINT_FIELD +} + +static GraphNode * +create(GraphNodeSpecification * spec) +{ + GraphNode * node = calloc(1, sizeof(GraphNode)); + if (!node) { + return node; + } + *node = (GraphNode) { + .as_EventPositionBase = { + .handle_event = &handle_event, + .waiting_new_event = false, + }, + .specification = spec, + .inputs = EMPTY_GRAPH_CHANNEL_LIST, + .outputs = EMPTY_GRAPH_CHANNEL_LIST, + }; + return node; +} + +static void destroy +(GraphNodeSpecification * self, GraphNode * target) +{ + (void) self; + free(target); +} + +GraphNodeSpecification nodespec_print = (GraphNodeSpecification) { + .create = &create, + .destroy = &destroy, + .register_io = NULL, + .name = "print", +}; diff --git a/nodes/print.h b/nodes/print.h new file mode 100644 index 0000000..e90c49e --- /dev/null +++ b/nodes/print.h @@ -0,0 +1,8 @@ +#ifndef NODES_PRINT_H_ +#define NODES_PRINT_H_ + +#include "../graph.h" + +extern GraphNodeSpecification nodespec_print; + +#endif /* end of include guard: NODES_PRINT_H_ */ diff --git a/processing.c b/processing.c new file mode 100644 index 0000000..734195e --- /dev/null +++ b/processing.c @@ -0,0 +1,361 @@ +#include +#include +#include +#include +#include "processing.h" + +static bool +io_subscription_list_extend(IOSubscriptionList * lst) +{ + size_t capacity = lst->capacity; + capacity = capacity + (capacity >> 1) + 1; + + int * new_fds = reallocarray(lst->fds, capacity, sizeof(int)); + if (!new_fds) { + return false; + } + lst->fds = new_fds; + + IOHandling ** new_subscribers = reallocarray(lst->subscribers, capacity, sizeof(IOHandling*)); + if (!new_subscribers) { + return false; + } + lst->subscribers = new_subscribers; + + lst->capacity = capacity; + return true; +} + +void +io_subscription_list_init(IOSubscriptionList * lst, size_t capacity) +{ + IOSubscriptionList result = { + .length = 0, + .capacity = 0, + .fds = NULL, + .subscribers = NULL, + }; + result.fds = calloc(capacity, sizeof(int)); + result.subscribers = calloc(capacity, sizeof(IOHandling*)); + if (!result.fds || !result.subscribers) + capacity = 0; + result.capacity = capacity; + *lst = result; +} + +void +io_subscription_list_deinit(IOSubscriptionList * lst) +{ + if (lst->fds) + free(lst->fds); + if (lst->subscribers) + free(lst->subscribers); + *lst = (IOSubscriptionList) { + .length = 0, + .capacity = 0, + .fds = NULL, + .subscribers = NULL, + }; +} + +void +io_subscription_list_add(IOSubscriptionList * lst, int fd, IOHandling *subscriber) +{ + if (lst->length >= lst->capacity) { + if (!io_subscription_list_extend(lst)) { + perror("Failed to extend io subscription list"); + exit(1); + } + } + assert(lst->length < lst->capacity); + size_t i = lst->length; + lst->fds[i] = fd; + lst->subscribers[i] = subscriber; + lst->length = i + 1; +} + +static int +populate_fd_set(fd_set * fds, IOSubscriptionList * src, int old_max_fd) +{ + FD_ZERO(fds); + for (size_t i = 0; i < src->length; ++i) { + IOHandling *subscriber = src->subscribers[i]; + if (!subscriber) { + continue; + } + if (!subscriber->enabled) { + continue; + } + int fd = src->fds[i]; + if (fd > old_max_fd) { + old_max_fd = fd; + } + FD_SET(fd, fds); + } + return old_max_fd; +} + +static void +run_io_handlers(fd_set * fds, IOSubscriptionList * subs, bool arg) +{ + for (size_t i = 0; i < subs->length; ++i) { + int fd = subs->fds[i]; + if (FD_ISSET(fd, fds)) { + IOHandling *subscriber = subs->subscribers[i]; + if (!subscriber) { + continue; + } + if (!subscriber->enabled) { + continue; + } + void (*callback) (EventPositionBase*, int, bool) = subscriber->handle_io; + if (callback) { + callback(subscriber->self, fd, arg); + } + } + } +} + +bool +process_io(ProcessingState * state, const struct timespec * timeout) +{ + int max_fd = 0; + fd_set readfds, writefds; + + max_fd = populate_fd_set(&readfds, &state->wait_input, max_fd); + max_fd = populate_fd_set(&writefds, &state->wait_output, max_fd); + + ++max_fd; + int ready = pselect(max_fd, &readfds, &writefds, NULL, timeout, NULL); + + if (ready < 0) { + FD_ZERO(&readfds); + FD_ZERO(&writefds); + return false; + } + + run_io_handlers(&readfds, &state->wait_input, false); + run_io_handlers(&writefds, &state->wait_output, true); + + FD_ZERO(&readfds); + FD_ZERO(&writefds); + return true; +} + +bool +schedule_delay(ProcessingState * state, EventPositionBase * target, void (*callback) (EventPositionBase*, void*, const struct timespec*), const struct timespec * time) +{ + DelayList **next = &state->wait_delay; + while (*next) { + struct timespec next_time = (*next)->time; + if (next_time.tv_sec > time->tv_sec) { + break; + } + if (next_time.tv_sec == time->tv_sec) { + if (next_time.tv_nsec > time->tv_nsec) { + break; + } + } + next = &((*next)->next); + } + + DelayList * current = malloc(sizeof(DelayList)); + if (!current) { + return false; + } + + *current = (DelayList) { + .callback = callback, + .target = target, + .next = *next, + .time = *time, + }; + *next = current; + return true; +} + +static const struct timespec ZERO_TS = { + .tv_sec = 0, + .tv_nsec = 0, +}; + +inline static void +fix_nsec(struct timespec * ts) +{ + if (ts->tv_nsec < 0) { + ts->tv_nsec += 1000000000; + ts->tv_sec -= 1; + } +} + +static bool +process_single_scheduled(ProcessingState * state, const struct timespec extern_time) +{ + if (!state->wait_delay) { + return false; + } + struct timespec next_scheduled_time = state->wait_delay->time; // abs + if (next_scheduled_time.tv_sec > extern_time.tv_sec) { + return false; + } else if (next_scheduled_time.tv_sec == extern_time.tv_sec) { + if (next_scheduled_time.tv_nsec > extern_time.tv_nsec) { + return false; + } + } + DelayList next_scheduled = *state->wait_delay; + free(state->wait_delay); + state->wait_delay = next_scheduled.next; + + if (next_scheduled.callback) { + next_scheduled.callback( + next_scheduled.target, + next_scheduled.closure, + &next_scheduled_time + ); + } + return true; +} + +static bool +process_events_until(ProcessingState * state, const struct timespec * max_time) +{ + bool stable = true; + int32_t next_priority = INT32_MIN; + state->has_future_events = false; + + FOREACH_EVENT(ev) { + if (max_time) { + struct timespec ev_time = ev->data.time; + if (ev_time.tv_sec > max_time->tv_sec) { + // stable = false; + state->has_future_events = true; + break; + } else if (ev_time.tv_sec == max_time->tv_sec) { + if (ev_time.tv_nsec > max_time->tv_nsec) { + // stable = false; + state->has_future_events = true; + break; + } + } + } + + if (ev->data.priority > next_priority) { + next_priority = ev->data.priority; + } + } + + while (next_priority > INT32_MIN) { + state->pass_priority = next_priority; + next_priority = INT32_MIN; + FOREACH_EVENT(ev) { + int32_t ev_priority = ev->data.priority; + if (ev_priority < state->pass_priority) { + if (ev_priority > next_priority) { + next_priority = ev_priority; + } + } else if (ev_priority > state->pass_priority) { + continue; + } + + EventPositionBase *position = ev->position; + if (!position) { + continue; + } + if (position->waiting_new_event) { + continue; + } + bool (*handler) (EventPositionBase*, EventNode*) = position->handle_event; + if (!handler) { + continue; + } + + if (max_time) { + struct timespec ev_time = ev->data.time; + if (ev_time.tv_sec > max_time->tv_sec) { + state->has_future_events = true; + break; + } else if (ev_time.tv_sec == max_time->tv_sec) { + if (ev_time.tv_nsec > max_time->tv_nsec) { + state->has_future_events = true; + break; + } + } + } + + stable = false; + bool should_rewind = handler(position, ev); + if (should_rewind) { + // ev = &END_EVENTS; // Will be set to FIRST_EVENT by loop increment + next_priority = INT32_MIN; // Break out of the outermost loop + break; + } + } + } + + state->reached_time = *max_time; + FOREACH_EVENT(ev) { + state->reached_time = ev->data.time; + break; + } + + return !stable; +} + +void +process_iteration(ProcessingState * state) +{ + struct timespec extern_time; + if (clock_gettime(CLOCK_MONOTONIC, &extern_time) < 0) { + perror("Failed to get time"); + exit(1); + } + + // late_by.tv_sec = extern_time.tv_sec - state->reached_time.tv_sec; + // late_by.tv_nsec = extern_time.tv_nsec - state->reached_time.tv_nsec; + // fix_nsec(&late_by); + + struct timespec next_scheduled_delay; + const struct timespec *max_io_timeout = NULL; + if (state->has_future_events) { + max_io_timeout = &ZERO_TS; + } else { + if (state->wait_delay) { + next_scheduled_delay = state->wait_delay->time; // abs + next_scheduled_delay.tv_sec -= extern_time.tv_sec; + next_scheduled_delay.tv_nsec -= extern_time.tv_nsec; + fix_nsec(&next_scheduled_delay); + if (next_scheduled_delay.tv_sec < 0) { + max_io_timeout = &ZERO_TS; + } else { + max_io_timeout = &next_scheduled_delay; + } + } + } + + // FIXME reason about timeouts + process_io(state, max_io_timeout); + // process_io(state, &ZERO_TS); + + while (true) { + bool had_scheduled = process_single_scheduled(state, extern_time); + const struct timespec *max_event_time = &extern_time; + if (state->wait_delay) { + struct timespec next_scheduled_time = state->wait_delay->time; + bool use_scheduled = false; + if (!use_scheduled) { + use_scheduled = next_scheduled_time.tv_sec > extern_time.tv_sec; + } + if (!use_scheduled) { + use_scheduled = next_scheduled_time.tv_nsec > extern_time.tv_nsec; + } + if (use_scheduled) { + max_event_time = &state->wait_delay->time; + } + } + bool had_events = process_events_until(state, max_event_time); + if (!had_scheduled && !had_events) { + break; + } + process_io(state, &ZERO_TS); + } +} diff --git a/processing.h b/processing.h new file mode 100644 index 0000000..8452dc8 --- /dev/null +++ b/processing.h @@ -0,0 +1,49 @@ +#ifndef PROCESSING_H_ +#define PROCESSING_H_ + +#include +#include "events.h" + +typedef struct io_handling IOHandling; + +// no virtual multiinheritance +struct io_handling { + EventPositionBase * self; + void (*handle_io) (EventPositionBase * self, int fd, bool is_output); + bool enabled; +}; + +typedef struct { + size_t length; + size_t capacity; + int *fds; + IOHandling **subscribers; +} IOSubscriptionList; + +typedef struct delay_list DelayList; + +struct delay_list { + void (*callback) (EventPositionBase * target, void * closure, const struct timespec * time); + EventPositionBase *target; + void *closure; + DelayList *next; + struct timespec time; +}; + +typedef struct { + IOSubscriptionList wait_input, wait_output; + DelayList *wait_delay; + struct timespec reached_time; + int32_t pass_priority; + bool has_future_events; +} ProcessingState; + +void io_subscription_list_init(IOSubscriptionList * lst, size_t capacity); +void io_subscription_list_deinit(IOSubscriptionList * lst); +void io_subscription_list_add(IOSubscriptionList * lst, int fd, IOHandling *subscriber); + +bool schedule_delay(ProcessingState * state, EventPositionBase * target, void (*callback) (EventPositionBase*, void*, const struct timespec*), const struct timespec * time); +bool process_io(ProcessingState * state, const struct timespec * timeout); +void process_iteration(ProcessingState * state); + +#endif /* end of include guard: PROCESSING_H_ */