#include #include #include "../graph.h" #include "../module_registry.h" #include "../queue.h" #include "../hash_table.h" typedef TYPED_HASH_TABLE(bool) EventSet; typedef struct { GraphNode as_GraphNode; EventData terminator_prototype; bool has_terminator; bool is_jumping; bool has_max_time, has_max_length; RelativeTime max_time; size_t max_length; size_t additional_step; size_t skip_next; Queue buffer; EventSet buffered_set; } WindowGraphNode; inline static HashTableKey hash_event_ptr(const EventNode * ptr) { return hash_table_key_from_bytes((void*)&ptr, sizeof(ptr)); } inline static bool event_set_has(const EventSet * set, const EventNode * element) { return hash_table_find(set, hash_event_ptr(element)) >= 0; } inline static bool event_set_add(EventSet * set, const EventNode * element) { const bool value = true; return hash_table_insert(set, hash_event_ptr(element), &value) >= 0; } inline static bool event_set_del(EventSet * set, const EventNode * element) { return hash_table_delete_by_key(set, hash_event_ptr(element)); } static EventNode * replicate_and_advance(EventNode ** ptr) { EventNode * old = *ptr; if (!old) { return NULL; } if (event_replicate(old, 1)) { *ptr = old->next; return old; } return NULL; } static void trigger_new_window(WindowGraphNode * node, EventNode * base) { if (!base) { return; } if (node->has_terminator) { EventNode * terminator = replicate_and_advance(&base); if (terminator) { terminator->data.code = node->terminator_prototype.code; terminator->data.modifiers = modifier_set_copy(node->terminator_prototype.modifiers); terminator->data.payload = node->terminator_prototype.payload; // Preserve ttl, priority, time graph_node_broadcast_forward_event(&node->as_GraphNode, terminator); } } size_t step = 1; if (node->is_jumping) { step = queue_length(&node->buffer); } step += node->additional_step; if (step < 1) { step = 1; } while (step > 0) { QueueValue popped; if (!queue_try_pop(&node->buffer, &popped)) { break; } --step; EventNode * event = popped.as_ptr; event_set_del(&node->buffered_set, event); } node->skip_next += step; QUEUE_FOREACH_INDEX(i, &node->buffer) { EventNode *orig = node->buffer.values[i].as_ptr; if (!orig) { continue; } EventNode *recipient = replicate_and_advance(&base); if (!recipient) { continue; } recipient->data = event_data_copy(orig->data); graph_node_broadcast_forward_event(&node->as_GraphNode, recipient); } event_destroy(base); } static bool handle_event(EventPositionBase * self, EventNode * event) { WindowGraphNode *node = DOWNCAST(WindowGraphNode, GraphNode, DOWNCAST(GraphNode, EventPositionBase, self)); if (event_set_has(&node->buffered_set, event)) { return false; } EventNode *replacement; const AbsoluteTime new_time = event->data.time; if (node->has_max_time) { const RelativeTime threshold = node->max_time; while (queue_length(&node->buffer) > 0) { EventNode *first_event = queue_peek(&node->buffer).as_ptr; if (!first_event) { break; } RelativeTime delta = absolute_time_sub_absolute(new_time, first_event->data.time); if (relative_time_cmp(delta, threshold) <= 0) { break; } // Here replacement should occur simultaneously-before the event replacement = replicate_and_advance(&event); if (!replacement) { return false; } event->position = self; replacement->position = NULL; trigger_new_window(node, replacement); } } if (node->skip_next > 0) { --(node->skip_next); return true; } // Replacement should occur after the forwarded replica replacement = NULL; if (event_replicate(event, 1)) { replacement = event->next; } if (event_replicate(event, 1)) { graph_node_broadcast_forward_event(&node->as_GraphNode, event->next); } queue_put(&node->buffer, (QueueValue){.as_ptr = event}); event_set_add(&node->buffered_set, event); if (node->has_max_length) { const size_t threshold = node->max_length; while (replacement && queue_length(&node->buffer) >= threshold) { trigger_new_window(node, replicate_and_advance(&replacement)); } } if (replacement) { event_destroy(replacement); } self->waiting_new_event = true; return true; } static void load_event_prototype(InitializationEnvironment * env, const config_setting_t * setting, EventData * proto) { if (!setting) { return; } proto->code.ns = env_resolve_constant(env, config_setting_get_member(setting, "namespace")); proto->code.major = env_resolve_constant(env, config_setting_get_member(setting, "major")); proto->code.minor = env_resolve_constant(env, config_setting_get_member(setting, "minor")); proto->payload = env_resolve_constant(env, config_setting_get_member(setting, "payload")); // TODO modifiers } static GraphNode * create(GraphNodeSpecification * spec, GraphNodeConfig * config, InitializationEnvironment * env) { WindowGraphNode * node = T_ALLOC(1, WindowGraphNode); if (!node) { return NULL; } bool is_jumping = false; ssize_t additional_step = 0; long long max_length = 0; long long max_milliseconds = 0; EventData terminator = {.code = {0, 0, 0}, .payload = 0, .priority = 10, .modifiers = EMPTY_MODIFIER_SET, .time = {}}; bool has_max_length = false, has_max_time = false, has_terminator = false; if (config->options) { is_jumping = env_resolve_constant(env, config_setting_get_member(config->options, "is_jumping")) != 0; additional_step = (ssize_t) env_resolve_constant(env, config_setting_get_member(config->options, "additional_step")); if (additional_step < 0) { additional_step = 0; } const config_setting_t *setting; if ((setting = config_setting_get_member(config->options, "max_length"))) { has_max_length = true; max_length = env_resolve_constant(env, setting); if (max_length < 0) { max_length = 0; } if (max_length > INT32_MAX) { max_length = 0; has_max_length = false; } } if ((setting = config_setting_get_member(config->options, "max_milliseconds"))) { has_max_time = true; max_milliseconds = env_resolve_constant(env, setting); if (max_milliseconds < 0) { max_milliseconds = 0; } } if ((setting = config_setting_get_member(config->options, "terminator"))) { has_terminator = true; load_event_prototype(env, setting, &terminator); } } *node = (WindowGraphNode) { .as_GraphNode = { .as_EventPositionBase = { .handle_event = &handle_event, .waiting_new_event = false, }, .specification = spec, .inputs = EMPTY_GRAPH_CHANNEL_LIST, .outputs = EMPTY_GRAPH_CHANNEL_LIST, }, .terminator_prototype = terminator, .has_terminator = has_terminator, .is_jumping = is_jumping, .has_max_time = has_max_time, .has_max_length = has_max_length, .max_time = relative_time_from_millisecond(max_milliseconds), .max_length = max_length, .additional_step = additional_step, .skip_next = 0, .buffer = EMPTY_QUEUE, .buffered_set = {}, }; hash_table_init(&node->buffered_set, NULL); return &node->as_GraphNode; } static void destroy (GraphNodeSpecification * self, GraphNode * target) { (void) self; WindowGraphNode * node = DOWNCAST(WindowGraphNode, GraphNode, target); modifier_set_destruct(&node->terminator_prototype.modifiers); queue_deinit(&node->buffer); hash_table_deinit(&node->buffered_set); free(target); } GraphNodeSpecification nodespec_window = (GraphNodeSpecification) { .create = &create, .destroy = &destroy, .register_io = NULL, .name = "window", .documentation = "Passes events through while copying them into an internal buffer, when buffer buffer.length or (buffer.last.time - buffer.first.time) thresholds are met optionally sends terminator event, rewinds events to buffer start, skips ((is_jumping ? buffer.length : 1) + additional_step) events, retransmits remaining buffered events and starts over\nAccepts events on any connector\nSends events on all connectors" "\nOption 'is_jumping' (optional): whether to send events at most once" "\nOption 'additional_step' (optional): natural number --- additional step relative to a regular sliding/jumping window" "\nOption 'max_length' (optional): natural number --- maximum number of events in a window" "\nOption 'max_milliseconds' (optional): natural number --- maximum number milliseconds between the first and the last event in a window" "\nOption 'terminator' (optional): event to send after the window fullness condition is met:" "\n\tField 'namespace' (optional): set generated event code namespace" "\n\tField 'major' (optional): set generated event code major" "\n\tField 'minor' (optional): set generated event code minor" "\n\tField 'payload' (optional): set generated event payload" , }; MODULE_CONSTRUCTOR(init) { register_graph_node_specification(&nodespec_window); }