From a769bbed6d0d9dcbc5bb6f0f66dcbdfb8fa1ab0c Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sat, 8 Feb 2025 10:08:26 -0600 Subject: port-serial: add serial port command scheduler Add an interface and implementation for a port scheduler that round- robins between ports the scheduler is attached to, serializing command execution among one or more MMPortSerial instances. Theory of operation: Sources (e.g. MMPort subclasses) register themselves with the scheduler. Each source notifies the scheduler whenever its command queue depth changes, for example when new commands are submitted, when commands are completed, or when commands are canceled. The scheduler will round-robin between all sources with pending commands, sleeping when there are no pending commands from any source. For each source with a pending command the scheduler will emit the 'send-command' signal with that source's ID. The given source should send the next command in its queue to the modem. When that command is finished (either successfully or with an error/timeout) the source must call mm_port_scheduler_notify_command_done() to notify the scheduler that it may advance to the next source with a pending command, if any. If the 'send-command' signal and the notify_command_done() call are not balanced the scheduler may stall. Signed-off-by: Dan Williams --- src/mm-port-serial.c | 118 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 10 deletions(-) (limited to 'src/mm-port-serial.c') diff --git a/src/mm-port-serial.c b/src/mm-port-serial.c index 8b9bef98..d1976972 100644 --- a/src/mm-port-serial.c +++ b/src/mm-port-serial.c @@ -36,6 +36,8 @@ #include "mm-port-serial.h" #include "mm-log-object.h" #include "mm-helper-enums-types.h" +#include "mm-port-scheduler.h" +#include "mm-port-scheduler-rr.h" static gboolean port_serial_queue_process (gpointer data); static void port_serial_schedule_queue_process (MMPortSerial *self, @@ -59,6 +61,7 @@ enum { PROP_FD, PROP_SPEW_CONTROL, PROP_FLASH_OK, + PROP_SCHEDULER, LAST_PROP }; @@ -81,6 +84,10 @@ struct _MMPortSerialPrivate { GQueue *queue; GByteArray *response; + /* Command scheduler */ + MMPortScheduler *scheduler; + guint scheduler_send_id; + /* For real ports, iochannel, and we implement the eagain limit */ GIOChannel *iochannel; guint iochannel_id; @@ -89,7 +96,6 @@ struct _MMPortSerialPrivate { GSocket *socket; GSource *socket_source; - guint baud; guint bits; char parity; @@ -194,8 +200,9 @@ mm_port_serial_command (MMPortSerial *self, else g_queue_push_tail (self->priv->queue, task); - if (g_queue_get_length (self->priv->queue) == 1) - port_serial_schedule_queue_process (self, 0); + mm_port_scheduler_notify_num_pending (self->priv->scheduler, + self, + g_queue_get_length (self->priv->queue)); } /*****************************************************************************/ @@ -737,10 +744,11 @@ port_serial_got_response (MMPortSerial *self, } g_object_unref (task); - } - if (!g_queue_is_empty (self->priv->queue)) - port_serial_schedule_queue_process (self, 0); + mm_port_scheduler_notify_command_done (self->priv->scheduler, + self, + g_queue_get_length (self->priv->queue)); + } } g_object_unref (self); g_clear_error (&error); @@ -805,6 +813,7 @@ port_serial_queue_process (gpointer data) GCancellable *cancellable; GError *error = NULL; + g_assert (self->priv->timeout_id == 0); self->priv->queue_id = 0; task = g_queue_peek_head (self->priv->queue); @@ -877,6 +886,21 @@ port_serial_queue_process (gpointer data) return G_SOURCE_REMOVE; } +static void +scheduler_send_command (MMPortScheduler *scheduler, + gpointer source, + gpointer user_data) +{ + MMPortSerial *self = MM_PORT_SERIAL (user_data); + + /* Must be for us */ + if (source == self) { + g_assert (self->priv->queue_id == 0); + g_assert (self->priv->timeout_id == 0); + port_serial_queue_process (self); + } +} + static void parse_response_buffer (MMPortSerial *self) { @@ -1444,6 +1468,8 @@ _close_internal (MMPortSerial *self, gboolean force) } g_queue_clear (self->priv->queue); + mm_port_scheduler_notify_num_pending (self->priv->scheduler, self, 0); + if (self->priv->timeout_id) { g_source_remove (self->priv->timeout_id); self->priv->timeout_id = 0; @@ -1889,6 +1915,35 @@ mm_port_serial_get_flow_control (MMPortSerial *self) /*****************************************************************************/ +static void +scheduler_setup (MMPortSerial *self, MMPortScheduler *scheduler) +{ + self->priv->scheduler = scheduler; + if (self->priv->scheduler) { + const gchar *port_name; + + port_name = mm_port_get_device (MM_PORT (self)); + mm_port_scheduler_register_source (self->priv->scheduler, self, port_name); + self->priv->scheduler_send_id = g_signal_connect (self->priv->scheduler, + MM_PORT_SCHEDULER_SIGNAL_SEND_COMMAND, + G_CALLBACK (scheduler_send_command), + self); + } +} + +static void +scheduler_cleanup (MMPortSerial *self) +{ + if (self->priv->scheduler) { + mm_port_scheduler_unregister_source (self->priv->scheduler, self); + g_signal_handler_disconnect (self->priv->scheduler, self->priv->scheduler_send_id); + self->priv->scheduler_send_id = 0; + } + g_clear_object (&self->priv->scheduler); +} + +/*****************************************************************************/ + MMPortSerial * mm_port_serial_new (const char *name, MMPortType ptype) { @@ -1961,6 +2016,16 @@ mm_port_serial_init (MMPortSerial *self) self->priv->response = g_byte_array_sized_new (500); } +static void +constructed (GObject *object) +{ + MMPortSerial *self = MM_PORT_SERIAL (object); + + /* Add a default scheduler if none was set via GObject properties */ + if (!self->priv->scheduler) + scheduler_setup (self, MM_PORT_SCHEDULER (mm_port_scheduler_rr_new ())); +} + static void set_property (GObject *object, guint prop_id, @@ -1997,6 +2062,10 @@ set_property (GObject *object, case PROP_FLASH_OK: self->priv->flash_ok = g_value_get_boolean (value); break; + case PROP_SCHEDULER: + scheduler_cleanup (self); + scheduler_setup (self, g_value_dup_object (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2039,6 +2108,9 @@ get_property (GObject *object, case PROP_FLASH_OK: g_value_set_boolean (value, self->priv->flash_ok); break; + case PROP_SCHEDULER: + g_value_set_object (value, self->priv->scheduler); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2046,7 +2118,7 @@ get_property (GObject *object, } static void -finalize (GObject *object) +dispose (GObject *object) { MMPortSerial *self = MM_PORT_SERIAL (object); @@ -2059,14 +2131,30 @@ finalize (GObject *object) g_assert (self->priv->socket == NULL); g_assert (self->priv->socket_source == NULL); - if (self->priv->timeout_id) + if (self->priv->timeout_id) { g_source_remove (self->priv->timeout_id); + self->priv->timeout_id = 0; + } - if (self->priv->queue_id) + if (self->priv->queue_id) { g_source_remove (self->priv->queue_id); + self->priv->queue_id = 0; + } - g_hash_table_destroy (self->priv->reply_cache); g_byte_array_unref (self->priv->response); + self->priv->response = NULL; + + scheduler_cleanup (self); + + G_OBJECT_CLASS (mm_port_serial_parent_class)->dispose (object); +} + +static void +finalize (GObject *object) +{ + MMPortSerial *self = MM_PORT_SERIAL (object); + + g_hash_table_destroy (self->priv->reply_cache); g_queue_free (self->priv->queue); G_OBJECT_CLASS (mm_port_serial_parent_class)->finalize (object); @@ -2080,8 +2168,10 @@ mm_port_serial_class_init (MMPortSerialClass *klass) g_type_class_add_private (object_class, sizeof (MMPortSerialPrivate)); /* Virtual methods */ + object_class->constructed = constructed; object_class->set_property = set_property; object_class->get_property = get_property; + object_class->dispose = dispose; object_class->finalize = finalize; klass->config_fd = real_config_fd; @@ -2161,6 +2251,14 @@ mm_port_serial_class_init (MMPortSerialClass *klass) TRUE, G_PARAM_READWRITE)); + g_object_class_install_property + (object_class, PROP_SCHEDULER, + g_param_spec_object (MM_PORT_SERIAL_SCHEDULER, + "Scheduler", + "Command scheduler object (optional)", + MM_TYPE_PORT_SCHEDULER, + G_PARAM_READWRITE)); + /* Signals */ signals[BUFFER_FULL] = g_signal_new ("buffer-full", -- cgit v1.2.3-70-g09d2