diff options
author | Dan Williams <dan@ioncontrol.co> | 2025-02-08 10:08:26 -0600 |
---|---|---|
committer | Dan Williams <dan@ioncontrol.co> | 2025-05-23 18:46:53 -0500 |
commit | a769bbed6d0d9dcbc5bb6f0f66dcbdfb8fa1ab0c (patch) | |
tree | 289b86fd8dc62124ad162e60f3efea35524bdf65 /src/mm-port-serial.c | |
parent | b24615d8018c7cd78a744db0372c09c07543b763 (diff) |
port-serial: add serial port command scheduler
Add an interface and implementation for a port scheduler that round-
robins between ports the scheduler is attached to, serializing command
execution among one or more MMPortSerial instances.
Theory of operation:
Sources (e.g. MMPort subclasses) register themselves with the scheduler.
Each source notifies the scheduler whenever its command queue depth changes,
for example when new commands are submitted, when commands are completed,
or when commands are canceled.
The scheduler will round-robin between all sources with pending commands,
sleeping when there are no pending commands from any source.
For each source with a pending command the scheduler will emit the
'send-command' signal with that source's ID. The given source should
send the next command in its queue to the modem.
When that command is finished (either successfully or with an error/timeout)
the source must call mm_port_scheduler_notify_command_done() to notify the
scheduler that it may advance to the next source with a pending command, if
any. If the 'send-command' signal and the notify_command_done() call are not
balanced the scheduler may stall.
Signed-off-by: Dan Williams <dan@ioncontrol.co>
Diffstat (limited to 'src/mm-port-serial.c')
-rw-r--r-- | src/mm-port-serial.c | 118 |
1 files changed, 108 insertions, 10 deletions
diff --git a/src/mm-port-serial.c b/src/mm-port-serial.c index 8b9bef98..d1976972 100644 --- a/src/mm-port-serial.c +++ b/src/mm-port-serial.c @@ -36,6 +36,8 @@ #include "mm-port-serial.h" #include "mm-log-object.h" #include "mm-helper-enums-types.h" +#include "mm-port-scheduler.h" +#include "mm-port-scheduler-rr.h" static gboolean port_serial_queue_process (gpointer data); static void port_serial_schedule_queue_process (MMPortSerial *self, @@ -59,6 +61,7 @@ enum { PROP_FD, PROP_SPEW_CONTROL, PROP_FLASH_OK, + PROP_SCHEDULER, LAST_PROP }; @@ -81,6 +84,10 @@ struct _MMPortSerialPrivate { GQueue *queue; GByteArray *response; + /* Command scheduler */ + MMPortScheduler *scheduler; + guint scheduler_send_id; + /* For real ports, iochannel, and we implement the eagain limit */ GIOChannel *iochannel; guint iochannel_id; @@ -89,7 +96,6 @@ struct _MMPortSerialPrivate { GSocket *socket; GSource *socket_source; - guint baud; guint bits; char parity; @@ -194,8 +200,9 @@ mm_port_serial_command (MMPortSerial *self, else g_queue_push_tail (self->priv->queue, task); - if (g_queue_get_length (self->priv->queue) == 1) - port_serial_schedule_queue_process (self, 0); + mm_port_scheduler_notify_num_pending (self->priv->scheduler, + self, + g_queue_get_length (self->priv->queue)); } /*****************************************************************************/ @@ -737,10 +744,11 @@ port_serial_got_response (MMPortSerial *self, } g_object_unref (task); - } - if (!g_queue_is_empty (self->priv->queue)) - port_serial_schedule_queue_process (self, 0); + mm_port_scheduler_notify_command_done (self->priv->scheduler, + self, + g_queue_get_length (self->priv->queue)); + } } g_object_unref (self); g_clear_error (&error); @@ -805,6 +813,7 @@ port_serial_queue_process (gpointer data) GCancellable *cancellable; GError *error = NULL; + g_assert (self->priv->timeout_id == 0); self->priv->queue_id = 0; task = g_queue_peek_head (self->priv->queue); @@ -878,6 +887,21 @@ port_serial_queue_process (gpointer data) } static void +scheduler_send_command (MMPortScheduler *scheduler, + gpointer source, + gpointer user_data) +{ + MMPortSerial *self = MM_PORT_SERIAL (user_data); + + /* Must be for us */ + if (source == self) { + g_assert (self->priv->queue_id == 0); + g_assert (self->priv->timeout_id == 0); + port_serial_queue_process (self); + } +} + +static void parse_response_buffer (MMPortSerial *self) { GError *error = NULL; @@ -1444,6 +1468,8 @@ _close_internal (MMPortSerial *self, gboolean force) } g_queue_clear (self->priv->queue); + mm_port_scheduler_notify_num_pending (self->priv->scheduler, self, 0); + if (self->priv->timeout_id) { g_source_remove (self->priv->timeout_id); self->priv->timeout_id = 0; @@ -1889,6 +1915,35 @@ mm_port_serial_get_flow_control (MMPortSerial *self) /*****************************************************************************/ +static void +scheduler_setup (MMPortSerial *self, MMPortScheduler *scheduler) +{ + self->priv->scheduler = scheduler; + if (self->priv->scheduler) { + const gchar *port_name; + + port_name = mm_port_get_device (MM_PORT (self)); + mm_port_scheduler_register_source (self->priv->scheduler, self, port_name); + self->priv->scheduler_send_id = g_signal_connect (self->priv->scheduler, + MM_PORT_SCHEDULER_SIGNAL_SEND_COMMAND, + G_CALLBACK (scheduler_send_command), + self); + } +} + +static void +scheduler_cleanup (MMPortSerial *self) +{ + if (self->priv->scheduler) { + mm_port_scheduler_unregister_source (self->priv->scheduler, self); + g_signal_handler_disconnect (self->priv->scheduler, self->priv->scheduler_send_id); + self->priv->scheduler_send_id = 0; + } + g_clear_object (&self->priv->scheduler); +} + +/*****************************************************************************/ + MMPortSerial * mm_port_serial_new (const char *name, MMPortType ptype) { @@ -1962,6 +2017,16 @@ mm_port_serial_init (MMPortSerial *self) } static void +constructed (GObject *object) +{ + MMPortSerial *self = MM_PORT_SERIAL (object); + + /* Add a default scheduler if none was set via GObject properties */ + if (!self->priv->scheduler) + scheduler_setup (self, MM_PORT_SCHEDULER (mm_port_scheduler_rr_new ())); +} + +static void set_property (GObject *object, guint prop_id, const GValue *value, @@ -1997,6 +2062,10 @@ set_property (GObject *object, case PROP_FLASH_OK: self->priv->flash_ok = g_value_get_boolean (value); break; + case PROP_SCHEDULER: + scheduler_cleanup (self); + scheduler_setup (self, g_value_dup_object (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2039,6 +2108,9 @@ get_property (GObject *object, case PROP_FLASH_OK: g_value_set_boolean (value, self->priv->flash_ok); break; + case PROP_SCHEDULER: + g_value_set_object (value, self->priv->scheduler); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -2046,7 +2118,7 @@ get_property (GObject *object, } static void -finalize (GObject *object) +dispose (GObject *object) { MMPortSerial *self = MM_PORT_SERIAL (object); @@ -2059,14 +2131,30 @@ finalize (GObject *object) g_assert (self->priv->socket == NULL); g_assert (self->priv->socket_source == NULL); - if (self->priv->timeout_id) + if (self->priv->timeout_id) { g_source_remove (self->priv->timeout_id); + self->priv->timeout_id = 0; + } - if (self->priv->queue_id) + if (self->priv->queue_id) { g_source_remove (self->priv->queue_id); + self->priv->queue_id = 0; + } - g_hash_table_destroy (self->priv->reply_cache); g_byte_array_unref (self->priv->response); + self->priv->response = NULL; + + scheduler_cleanup (self); + + G_OBJECT_CLASS (mm_port_serial_parent_class)->dispose (object); +} + +static void +finalize (GObject *object) +{ + MMPortSerial *self = MM_PORT_SERIAL (object); + + g_hash_table_destroy (self->priv->reply_cache); g_queue_free (self->priv->queue); G_OBJECT_CLASS (mm_port_serial_parent_class)->finalize (object); @@ -2080,8 +2168,10 @@ mm_port_serial_class_init (MMPortSerialClass *klass) g_type_class_add_private (object_class, sizeof (MMPortSerialPrivate)); /* Virtual methods */ + object_class->constructed = constructed; object_class->set_property = set_property; object_class->get_property = get_property; + object_class->dispose = dispose; object_class->finalize = finalize; klass->config_fd = real_config_fd; @@ -2161,6 +2251,14 @@ mm_port_serial_class_init (MMPortSerialClass *klass) TRUE, G_PARAM_READWRITE)); + g_object_class_install_property + (object_class, PROP_SCHEDULER, + g_param_spec_object (MM_PORT_SERIAL_SCHEDULER, + "Scheduler", + "Command scheduler object (optional)", + MM_TYPE_PORT_SCHEDULER, + G_PARAM_READWRITE)); + /* Signals */ signals[BUFFER_FULL] = g_signal_new ("buffer-full", |