Initial commit

This commit is contained in:
Vftdan 2024-08-13 23:33:52 +02:00
commit d0bcd6b20c
15 changed files with 1067 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.o
main

23
Makefile Normal file
View File

@ -0,0 +1,23 @@
DEPS =
CFLAGS = -Wall -Wextra
ifdef DEBUG
CFLAGS += -Og -gdwarf-2
CPPFLAGS += -D DEBUG
else
CFLAGS += -O2
endif
CPPFLAGS += $(shell pkg-config --cflags $(DEPS))
LDLIBS += $(shell pkg-config --libs $(DEPS))
INTERP ?=
MAIN = main
OBJS = main.o events.o processing.o graph.o nodes/getchar.o nodes/print.o
all: $(MAIN)
run: $(MAIN)
$(INTERP) ./$(MAIN)
.PHONY: all run
$(MAIN): $(OBJS)
$(CC) $(LDFLAGS) $^ $(LOADLIBES) $(LDLIBS) -o $@

16
defs.h Normal file
View File

@ -0,0 +1,16 @@
#ifndef DEFS_H_
#define DEFS_H_
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <stddef.h>
#define containerof(ptr, contype, membpath) ((contype*)(0 ? (void)(((contype*)NULL)->membpath = *(ptr)) : (void)0, ((char *)(ptr)) - offsetof(contype, membpath)))
// Assuming child type has a field for the base type
// So for structs it is usually actual downcast, but for unions it is an upcast
#define DOWNCAST(contype, basename, ptr) containerof(ptr, contype, as_##basename)
#define DEBUG_PRINT_VALUE(x, fmt) fprintf(stderr, #x " = " fmt "\n", x); fflush(stderr)
#endif /* end of include guard: DEFS_H_ */

73
events.c Normal file
View File

@ -0,0 +1,73 @@
#include "events.h"
EventNode
END_EVENTS = {
.prev = &END_EVENTS,
.next = &END_EVENTS,
.position = NULL,
.input_index = 0,
};
size_t
event_replicate(EventNode * source, size_t count)
{
size_t i;
for (i = 0; i < count; ++i) {
EventNode * replica = malloc(sizeof(EventNode));
if (!replica) {
break;
}
replica->position = NULL;
replica->input_index = 0;
replica->data = source->data;
replica->data.modifiers = modifier_set_copy(source->data.modifiers);
replica->prev = source;
replica->next = source->next;
source->next->next->prev = replica;
source->next = replica;
}
return i;
}
EventNode *
event_create(const EventData * content)
{
EventNode * event = calloc(1, sizeof(EventNode));
if (content) {
event->data = *content;
event->data.modifiers = modifier_set_copy(content->modifiers);
} else {
clock_gettime(CLOCK_MONOTONIC, &event->data.time);
}
struct timespec self_time = event->data.time;
EventNode ** list_pos = &LAST_EVENT;
FOREACH_EVENT_DESC(other) {
struct timespec other_time = other->data.time;
if (self_time.tv_sec > other_time.tv_sec) {
break;
}
if (self_time.tv_sec == other_time.tv_sec) {
if (self_time.tv_nsec >= other_time.tv_nsec) {
break;
}
}
list_pos = &other->next->prev;
}
EventNode * prev = *list_pos;
event->next = prev->next;
event->prev = prev;
prev->next->prev = event;
prev->next = event;
return event;
}
void
event_destroy(EventNode * self)
{
modifier_set_destruct(&self->data.modifiers);
self->next->prev = self->prev;
self->prev->next = self->next;
self->prev = NULL;
self->next = NULL;
free(self);
}

50
events.h Normal file
View File

@ -0,0 +1,50 @@
#ifndef EVENTS_H_
#define EVENTS_H_
#include <time.h>
#include "defs.h"
#include "modifiers.h"
typedef uint32_t EventNamespace;
typedef struct {
EventNamespace ns;
uint32_t value;
} EventCode;
typedef struct {
EventCode code;
uint32_t ttl;
int32_t priority;
int64_t payload;
ModifierSet modifiers;
struct timespec time;
} EventData;
typedef struct event_position_base EventPositionBase;
typedef struct event_node EventNode;
struct event_position_base {
bool (*handle_event) (EventPositionBase * self, EventNode * event); // If returns false, the scheduler should not rewind back to the start. Must return true if any events were deleted
bool waiting_new_event; // Skip from handling until it is set to true. Assigning this position to a event should unset this flag
};
struct event_node {
EventNode *prev, *next;
EventPositionBase *position;
size_t input_index;
EventData data;
};
extern EventNode END_EVENTS;
#define FIRST_EVENT (END_EVENTS.next)
#define LAST_EVENT (END_EVENTS.prev)
#define FOREACH_EVENT(ev) for (EventNode *ev = FIRST_EVENT; ev && (ev != &END_EVENTS); ev = ev->next)
#define FOREACH_EVENT_DESC(ev) for (EventNode *ev = LAST_EVENT; ev && (ev != &END_EVENTS); ev = ev->prev)
// Creates count replicas after the source event in the list, position is NULL, returns the number of successfully created replicas
size_t event_replicate(EventNode * source, size_t count);
EventNode * event_create(const EventData * content);
void event_destroy(EventNode * self);
#endif /* end of include guard: EVENTS_H_ */

167
graph.c Normal file
View File

@ -0,0 +1,167 @@
#include <stdio.h>
#include "graph.h"
static void
graph_channel_list_resize(GraphChannelList * lst, size_t target)
{
if (target > lst->length) {
GraphChannel **new_elements = reallocarray(lst->elements, target, sizeof(GraphChannel*));
if (!new_elements) {
return;
}
memset(&new_elements[lst->length], 0, (target - lst->length) * sizeof(GraphChannel*));
lst->elements = new_elements;
}
lst->length = target;
}
inline static void
graph_channel_list_ensure(GraphChannelList * lst, size_t idx)
{
size_t min_length = idx + 1;
if (lst->length < min_length) {
graph_channel_list_resize(lst, min_length);
}
if (lst->length < min_length) {
perror("Failed to resize graph channel list");
exit(1);
}
}
static bool
channel_handle_event(EventPositionBase * self, EventNode * event)
{
GraphChannel *ch = DOWNCAST(GraphChannel, EventPositionBase, self);
if (event->data.ttl == 0) {
event_destroy(event);
return true;
}
event->data.ttl--;
if (event->data.ttl == 0) {
event_destroy(event);
return true;
}
GraphNode * target = ch->end;
if (!target) {
event_destroy(event);
return true;
}
event->position = &target->as_EventPositionBase;
event->input_index = ch->idx_end;
target->as_EventPositionBase.waiting_new_event = false;
return false; // Continue processing events
}
void
graph_channel_init(GraphChannel * ch, GraphNode * start, size_t start_idx, GraphNode * end, size_t end_idx)
{
if (start) {
graph_channel_list_ensure(&start->outputs, start_idx);
GraphChannel ** old_ch = &start->outputs.elements[start_idx];
if (*old_ch) {
if ((*old_ch)->start == start && (*old_ch)->idx_start == start_idx) {
(*old_ch)->start = NULL;
// TODO on orphaned check
}
}
*old_ch = ch;
}
if (end) {
graph_channel_list_ensure(&end->inputs, end_idx);
GraphChannel ** old_ch = &end->inputs.elements[end_idx];
if (*old_ch) {
if ((*old_ch)->end == end && (*old_ch)->idx_end == end_idx) {
(*old_ch)->end = NULL;
// TODO on orphaned check
}
}
*old_ch = ch;
}
ch->start = start;
ch->end = end;
ch->idx_start = start_idx;
ch->idx_end = end_idx;
ch->as_EventPositionBase.handle_event = &channel_handle_event;
ch->as_EventPositionBase.waiting_new_event = false;
}
GraphNode *
graph_node_new(GraphNodeSpecification * spec)
{
if (!spec || !spec->create) {
return NULL;
}
return spec->create(spec);
}
void
graph_node_delete(GraphNode * self)
{
if (!self) {
return;
}
for (size_t i = 0; i < self->inputs.length; ++i) {
GraphChannel *ch = self->inputs.elements[i];
if (ch && ch->end == self) {
ch->end = NULL;
if (!ch->start && !ch->end) {
// free(ch); // TODO on orphaned
}
}
self->inputs.elements[i] = NULL;
}
for (size_t i = 0; i < self->outputs.length; ++i) {
GraphChannel *ch = self->outputs.elements[i];
if (ch && ch->start == self) {
ch->start = NULL;
if (!ch->start && !ch->end) {
// free(ch); // TODO on orphaned
}
}
self->outputs.elements[i] = NULL;
}
graph_channel_list_deinit(&self->inputs);
graph_channel_list_deinit(&self->outputs);
GraphNodeSpecification *spec = self->specification;
if (!spec || !spec->destroy) {
return;
}
spec->destroy(spec, self);
}
void
graph_node_register_io(GraphNode * self, ProcessingState * state)
{
if (!self) {
return;
}
GraphNodeSpecification *spec = self->specification;
if (!spec || !spec->register_io) {
return;
}
spec->register_io(spec, self, state);
}
void
graph_channel_list_init(GraphChannelList * lst)
{
lst->length = 0;
lst->elements = NULL;
}
void
graph_channel_list_deinit(GraphChannelList * lst)
{
if (lst->elements) {
free(lst->elements);
lst->elements = NULL;
}
lst->length = 0;
}

44
graph.h Normal file
View File

@ -0,0 +1,44 @@
#ifndef GRAPH_H_
#define GRAPH_H_
#include "events.h"
#include "processing.h"
typedef struct graph_node GraphNode;
typedef struct graph_channel GraphChannel;
typedef struct graph_node_specification GraphNodeSpecification;
typedef struct {
size_t length;
GraphChannel ** elements;
} GraphChannelList;
#define EMPTY_GRAPH_CHANNEL_LIST ((GraphChannelList) {.length = 0, .elements = NULL})
struct graph_node {
EventPositionBase as_EventPositionBase;
GraphNodeSpecification * specification;
GraphChannelList inputs, outputs;
};
struct graph_channel {
EventPositionBase as_EventPositionBase;
GraphNode *start, *end;
size_t idx_start, idx_end;
};
struct graph_node_specification {
GraphNode * (*create)(GraphNodeSpecification * self);
void (*destroy)(GraphNodeSpecification * self, GraphNode * target);
void (*register_io)(GraphNodeSpecification * self, GraphNode * target, ProcessingState * state);
char *name;
};
void graph_channel_init(GraphChannel * ch, GraphNode * start, size_t start_idx, GraphNode * end, size_t end_idx);
GraphNode *graph_node_new(GraphNodeSpecification * spec);
void graph_node_delete(GraphNode * self);
void graph_node_register_io(GraphNode * self, ProcessingState * state);
void graph_channel_list_init(GraphChannelList * lst);
void graph_channel_list_deinit(GraphChannelList * lst);
#endif /* end of include guard: GRAPH_H_ */

35
main.c Normal file
View File

@ -0,0 +1,35 @@
#include "processing.h"
#include "nodes/print.h"
#include "nodes/getchar.h"
int
main(int argc, char ** argv)
{
(void)argc;
(void)argv;
ProcessingState state = (ProcessingState) {
.wait_delay = NULL,
};
clock_gettime(CLOCK_MONOTONIC, &state.reached_time);
io_subscription_list_init(&state.wait_input, 5);
io_subscription_list_init(&state.wait_output, 5);
GraphNode * input_node = graph_node_new(&nodespec_getchar);
GraphNode * output_node = graph_node_new(&nodespec_print);
GraphChannel chan;
graph_channel_init(&chan, input_node, 0, output_node, 0);
graph_node_register_io(input_node, &state);
graph_node_register_io(output_node, &state);
while (true) {
process_iteration(&state);
}
graph_node_delete(output_node);
graph_node_delete(input_node);
io_subscription_list_deinit(&state.wait_output);
io_subscription_list_deinit(&state.wait_input);
return 0;
}

78
modifiers.h Normal file
View File

@ -0,0 +1,78 @@
#ifndef MODIFIERS_H_
#define MODIFIERS_H_
#include "defs.h"
#include <string.h>
typedef struct {
size_t byte_length;
uint8_t *bits;
} ModifierSet;
#define EMPTY_MODIFIER_SET ((ModifierSet) {.byte_length = 0, .bits = NULL})
__attribute__((unused)) inline static ModifierSet
modifier_set_copy(const ModifierSet old)
{
ModifierSet result = old;
result.bits = malloc(result.byte_length);
if (!result.bits) {
result.byte_length = 0;
return result;
}
memcpy(result.bits, old.bits, result.byte_length);
return result;
}
__attribute__((unused)) inline static void
modifier_set_destruct(ModifierSet * old)
{
if (old->bits) {
free(old->bits);
}
old->bits = NULL;
old->byte_length = 0;
}
__attribute__((unused)) inline static bool
modifier_set_extend(ModifierSet * old, size_t new_byte_length)
{
if (new_byte_length > old->byte_length) {
uint8_t *bits = realloc(old->bits, new_byte_length);
if (!bits) {
return false;
}
memset(bits + old->byte_length, 0, new_byte_length - old->byte_length);
old->bits = bits;
old->byte_length = new_byte_length;
}
return true;
}
__attribute__((unused)) inline static void
modifier_set_set_from(ModifierSet * target, const ModifierSet source)
{
modifier_set_extend(target, source.byte_length);
for (size_t i = 0; i < target->byte_length; ++i) {
target->bits[i] |= source.bits[i];
}
}
__attribute__((unused)) inline static void
modifier_set_unset_from(ModifierSet * target, const ModifierSet source)
{
for (size_t i = 0; i < target->byte_length; ++i) {
target->bits[i] &= ~source.bits[i];
}
}
__attribute__((unused)) inline static void
modifier_set_toggle_from(ModifierSet * target, const ModifierSet source)
{
modifier_set_extend(target, source.byte_length);
for (size_t i = 0; i < target->byte_length; ++i) {
target->bits[i] ^= source.bits[i];
}
}
#endif /* end of include guard: MODIFIERS_H_ */

94
nodes/getchar.c Normal file
View File

@ -0,0 +1,94 @@
#include <stdio.h>
#include <unistd.h>
#include "getchar.h"
#include "../processing.h"
typedef struct {
GraphNode as_GraphNode;
IOHandling subscription;
} GetcharGraphNode;
static void
handle_io(EventPositionBase * self, int fd, bool is_output)
{
(void) is_output;
GetcharGraphNode * node = DOWNCAST(GetcharGraphNode, GraphNode, DOWNCAST(GraphNode, EventPositionBase, self));
char buf[1];
ssize_t status = read(fd, buf, 1);
if (status < 0) {
perror("Failed to read character");
return;
}
EventData data = {
.code = {
.ns = 0,
.value = 1,
},
.ttl = 100,
.priority = 10,
.payload = buf[0],
.modifiers = EMPTY_MODIFIER_SET,
};
clock_gettime(CLOCK_MONOTONIC, &data.time);
if (status == 0) {
node->subscription.enabled = false;
data.code.value = 2;
data.payload = 0;
}
for (size_t i = 0; i < node->as_GraphNode.outputs.length; ++i) {
EventNode *ev = event_create(&data);
if (!ev) {
perror("Failed to create event");
break;
}
ev->position = &node->as_GraphNode.outputs.elements[i]->as_EventPositionBase;
}
}
static GraphNode *
create(GraphNodeSpecification * spec)
{
GetcharGraphNode * node = calloc(1, sizeof(GetcharGraphNode));
if (!node) {
return NULL;
}
*node = (GetcharGraphNode) {
.as_GraphNode = {
.as_EventPositionBase = {
.handle_event = NULL,
.waiting_new_event = false,
},
.specification = spec,
.inputs = EMPTY_GRAPH_CHANNEL_LIST,
.outputs = EMPTY_GRAPH_CHANNEL_LIST,
},
.subscription = {
.self = &node->as_GraphNode.as_EventPositionBase,
.handle_io = handle_io,
.enabled = true,
},
};
return &node->as_GraphNode;
}
static void destroy
(GraphNodeSpecification * self, GraphNode * target)
{
(void) self;
free(target);
}
static void
register_io(GraphNodeSpecification * self, GraphNode * target, ProcessingState * state)
{
(void) self;
GetcharGraphNode * node = DOWNCAST(GetcharGraphNode, GraphNode, target);
io_subscription_list_add(&state->wait_input, fileno(stdin), &node->subscription);
}
GraphNodeSpecification nodespec_getchar = (GraphNodeSpecification) {
.create = &create,
.destroy = &destroy,
.register_io = &register_io,
.name = "getchar",
};

8
nodes/getchar.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef NODES_GETCHAR_H_
#define NODES_GETCHAR_H_
#include "../graph.h"
extern GraphNodeSpecification nodespec_getchar;
#endif /* end of include guard: NODES_GETCHAR_H_ */

59
nodes/print.c Normal file
View File

@ -0,0 +1,59 @@
#include <stdio.h>
#include "print.h"
static bool
handle_event(EventPositionBase * self, EventNode * event)
{
#define PRINT_FIELD(fmt, path) printf("%s = " fmt "\n", #path, data.path)
(void) self;
EventData data = event->data;
printf("Event from connector %ld:\n", event->input_index);
PRINT_FIELD("%d", code.ns);
PRINT_FIELD("%d", code.value);
PRINT_FIELD("%d", ttl);
PRINT_FIELD("%d", priority);
PRINT_FIELD("%ld", payload);
printf("modifiers = ");
for (ssize_t i = data.modifiers.byte_length - 1; i >= 0; --i) {
printf("%02x", data.modifiers.bits[i]);
}
printf("\n");
printf("time = %ld.%09ld\n", data.time.tv_sec, data.time.tv_nsec);
printf("---\n\n");
event_destroy(event);
return true;
#undef PRINT_FIELD
}
static GraphNode *
create(GraphNodeSpecification * spec)
{
GraphNode * node = calloc(1, sizeof(GraphNode));
if (!node) {
return node;
}
*node = (GraphNode) {
.as_EventPositionBase = {
.handle_event = &handle_event,
.waiting_new_event = false,
},
.specification = spec,
.inputs = EMPTY_GRAPH_CHANNEL_LIST,
.outputs = EMPTY_GRAPH_CHANNEL_LIST,
};
return node;
}
static void destroy
(GraphNodeSpecification * self, GraphNode * target)
{
(void) self;
free(target);
}
GraphNodeSpecification nodespec_print = (GraphNodeSpecification) {
.create = &create,
.destroy = &destroy,
.register_io = NULL,
.name = "print",
};

8
nodes/print.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef NODES_PRINT_H_
#define NODES_PRINT_H_
#include "../graph.h"
extern GraphNodeSpecification nodespec_print;
#endif /* end of include guard: NODES_PRINT_H_ */

361
processing.c Normal file
View File

@ -0,0 +1,361 @@
#include <stdio.h>
#include <sys/select.h>
#include <assert.h>
#include <limits.h>
#include "processing.h"
static bool
io_subscription_list_extend(IOSubscriptionList * lst)
{
size_t capacity = lst->capacity;
capacity = capacity + (capacity >> 1) + 1;
int * new_fds = reallocarray(lst->fds, capacity, sizeof(int));
if (!new_fds) {
return false;
}
lst->fds = new_fds;
IOHandling ** new_subscribers = reallocarray(lst->subscribers, capacity, sizeof(IOHandling*));
if (!new_subscribers) {
return false;
}
lst->subscribers = new_subscribers;
lst->capacity = capacity;
return true;
}
void
io_subscription_list_init(IOSubscriptionList * lst, size_t capacity)
{
IOSubscriptionList result = {
.length = 0,
.capacity = 0,
.fds = NULL,
.subscribers = NULL,
};
result.fds = calloc(capacity, sizeof(int));
result.subscribers = calloc(capacity, sizeof(IOHandling*));
if (!result.fds || !result.subscribers)
capacity = 0;
result.capacity = capacity;
*lst = result;
}
void
io_subscription_list_deinit(IOSubscriptionList * lst)
{
if (lst->fds)
free(lst->fds);
if (lst->subscribers)
free(lst->subscribers);
*lst = (IOSubscriptionList) {
.length = 0,
.capacity = 0,
.fds = NULL,
.subscribers = NULL,
};
}
void
io_subscription_list_add(IOSubscriptionList * lst, int fd, IOHandling *subscriber)
{
if (lst->length >= lst->capacity) {
if (!io_subscription_list_extend(lst)) {
perror("Failed to extend io subscription list");
exit(1);
}
}
assert(lst->length < lst->capacity);
size_t i = lst->length;
lst->fds[i] = fd;
lst->subscribers[i] = subscriber;
lst->length = i + 1;
}
static int
populate_fd_set(fd_set * fds, IOSubscriptionList * src, int old_max_fd)
{
FD_ZERO(fds);
for (size_t i = 0; i < src->length; ++i) {
IOHandling *subscriber = src->subscribers[i];
if (!subscriber) {
continue;
}
if (!subscriber->enabled) {
continue;
}
int fd = src->fds[i];
if (fd > old_max_fd) {
old_max_fd = fd;
}
FD_SET(fd, fds);
}
return old_max_fd;
}
static void
run_io_handlers(fd_set * fds, IOSubscriptionList * subs, bool arg)
{
for (size_t i = 0; i < subs->length; ++i) {
int fd = subs->fds[i];
if (FD_ISSET(fd, fds)) {
IOHandling *subscriber = subs->subscribers[i];
if (!subscriber) {
continue;
}
if (!subscriber->enabled) {
continue;
}
void (*callback) (EventPositionBase*, int, bool) = subscriber->handle_io;
if (callback) {
callback(subscriber->self, fd, arg);
}
}
}
}
bool
process_io(ProcessingState * state, const struct timespec * timeout)
{
int max_fd = 0;
fd_set readfds, writefds;
max_fd = populate_fd_set(&readfds, &state->wait_input, max_fd);
max_fd = populate_fd_set(&writefds, &state->wait_output, max_fd);
++max_fd;
int ready = pselect(max_fd, &readfds, &writefds, NULL, timeout, NULL);
if (ready < 0) {
FD_ZERO(&readfds);
FD_ZERO(&writefds);
return false;
}
run_io_handlers(&readfds, &state->wait_input, false);
run_io_handlers(&writefds, &state->wait_output, true);
FD_ZERO(&readfds);
FD_ZERO(&writefds);
return true;
}
bool
schedule_delay(ProcessingState * state, EventPositionBase * target, void (*callback) (EventPositionBase*, void*, const struct timespec*), const struct timespec * time)
{
DelayList **next = &state->wait_delay;
while (*next) {
struct timespec next_time = (*next)->time;
if (next_time.tv_sec > time->tv_sec) {
break;
}
if (next_time.tv_sec == time->tv_sec) {
if (next_time.tv_nsec > time->tv_nsec) {
break;
}
}
next = &((*next)->next);
}
DelayList * current = malloc(sizeof(DelayList));
if (!current) {
return false;
}
*current = (DelayList) {
.callback = callback,
.target = target,
.next = *next,
.time = *time,
};
*next = current;
return true;
}
static const struct timespec ZERO_TS = {
.tv_sec = 0,
.tv_nsec = 0,
};
inline static void
fix_nsec(struct timespec * ts)
{
if (ts->tv_nsec < 0) {
ts->tv_nsec += 1000000000;
ts->tv_sec -= 1;
}
}
static bool
process_single_scheduled(ProcessingState * state, const struct timespec extern_time)
{
if (!state->wait_delay) {
return false;
}
struct timespec next_scheduled_time = state->wait_delay->time; // abs
if (next_scheduled_time.tv_sec > extern_time.tv_sec) {
return false;
} else if (next_scheduled_time.tv_sec == extern_time.tv_sec) {
if (next_scheduled_time.tv_nsec > extern_time.tv_nsec) {
return false;
}
}
DelayList next_scheduled = *state->wait_delay;
free(state->wait_delay);
state->wait_delay = next_scheduled.next;
if (next_scheduled.callback) {
next_scheduled.callback(
next_scheduled.target,
next_scheduled.closure,
&next_scheduled_time
);
}
return true;
}
static bool
process_events_until(ProcessingState * state, const struct timespec * max_time)
{
bool stable = true;
int32_t next_priority = INT32_MIN;
state->has_future_events = false;
FOREACH_EVENT(ev) {
if (max_time) {
struct timespec ev_time = ev->data.time;
if (ev_time.tv_sec > max_time->tv_sec) {
// stable = false;
state->has_future_events = true;
break;
} else if (ev_time.tv_sec == max_time->tv_sec) {
if (ev_time.tv_nsec > max_time->tv_nsec) {
// stable = false;
state->has_future_events = true;
break;
}
}
}
if (ev->data.priority > next_priority) {
next_priority = ev->data.priority;
}
}
while (next_priority > INT32_MIN) {
state->pass_priority = next_priority;
next_priority = INT32_MIN;
FOREACH_EVENT(ev) {
int32_t ev_priority = ev->data.priority;
if (ev_priority < state->pass_priority) {
if (ev_priority > next_priority) {
next_priority = ev_priority;
}
} else if (ev_priority > state->pass_priority) {
continue;
}
EventPositionBase *position = ev->position;
if (!position) {
continue;
}
if (position->waiting_new_event) {
continue;
}
bool (*handler) (EventPositionBase*, EventNode*) = position->handle_event;
if (!handler) {
continue;
}
if (max_time) {
struct timespec ev_time = ev->data.time;
if (ev_time.tv_sec > max_time->tv_sec) {
state->has_future_events = true;
break;
} else if (ev_time.tv_sec == max_time->tv_sec) {
if (ev_time.tv_nsec > max_time->tv_nsec) {
state->has_future_events = true;
break;
}
}
}
stable = false;
bool should_rewind = handler(position, ev);
if (should_rewind) {
// ev = &END_EVENTS; // Will be set to FIRST_EVENT by loop increment
next_priority = INT32_MIN; // Break out of the outermost loop
break;
}
}
}
state->reached_time = *max_time;
FOREACH_EVENT(ev) {
state->reached_time = ev->data.time;
break;
}
return !stable;
}
void
process_iteration(ProcessingState * state)
{
struct timespec extern_time;
if (clock_gettime(CLOCK_MONOTONIC, &extern_time) < 0) {
perror("Failed to get time");
exit(1);
}
// late_by.tv_sec = extern_time.tv_sec - state->reached_time.tv_sec;
// late_by.tv_nsec = extern_time.tv_nsec - state->reached_time.tv_nsec;
// fix_nsec(&late_by);
struct timespec next_scheduled_delay;
const struct timespec *max_io_timeout = NULL;
if (state->has_future_events) {
max_io_timeout = &ZERO_TS;
} else {
if (state->wait_delay) {
next_scheduled_delay = state->wait_delay->time; // abs
next_scheduled_delay.tv_sec -= extern_time.tv_sec;
next_scheduled_delay.tv_nsec -= extern_time.tv_nsec;
fix_nsec(&next_scheduled_delay);
if (next_scheduled_delay.tv_sec < 0) {
max_io_timeout = &ZERO_TS;
} else {
max_io_timeout = &next_scheduled_delay;
}
}
}
// FIXME reason about timeouts
process_io(state, max_io_timeout);
// process_io(state, &ZERO_TS);
while (true) {
bool had_scheduled = process_single_scheduled(state, extern_time);
const struct timespec *max_event_time = &extern_time;
if (state->wait_delay) {
struct timespec next_scheduled_time = state->wait_delay->time;
bool use_scheduled = false;
if (!use_scheduled) {
use_scheduled = next_scheduled_time.tv_sec > extern_time.tv_sec;
}
if (!use_scheduled) {
use_scheduled = next_scheduled_time.tv_nsec > extern_time.tv_nsec;
}
if (use_scheduled) {
max_event_time = &state->wait_delay->time;
}
}
bool had_events = process_events_until(state, max_event_time);
if (!had_scheduled && !had_events) {
break;
}
process_io(state, &ZERO_TS);
}
}

49
processing.h Normal file
View File

@ -0,0 +1,49 @@
#ifndef PROCESSING_H_
#define PROCESSING_H_
#include <sys/select.h>
#include "events.h"
typedef struct io_handling IOHandling;
// no virtual multiinheritance
struct io_handling {
EventPositionBase * self;
void (*handle_io) (EventPositionBase * self, int fd, bool is_output);
bool enabled;
};
typedef struct {
size_t length;
size_t capacity;
int *fds;
IOHandling **subscribers;
} IOSubscriptionList;
typedef struct delay_list DelayList;
struct delay_list {
void (*callback) (EventPositionBase * target, void * closure, const struct timespec * time);
EventPositionBase *target;
void *closure;
DelayList *next;
struct timespec time;
};
typedef struct {
IOSubscriptionList wait_input, wait_output;
DelayList *wait_delay;
struct timespec reached_time;
int32_t pass_priority;
bool has_future_events;
} ProcessingState;
void io_subscription_list_init(IOSubscriptionList * lst, size_t capacity);
void io_subscription_list_deinit(IOSubscriptionList * lst);
void io_subscription_list_add(IOSubscriptionList * lst, int fd, IOHandling *subscriber);
bool schedule_delay(ProcessingState * state, EventPositionBase * target, void (*callback) (EventPositionBase*, void*, const struct timespec*), const struct timespec * time);
bool process_io(ProcessingState * state, const struct timespec * timeout);
void process_iteration(ProcessingState * state);
#endif /* end of include guard: PROCESSING_H_ */