/* * struct suspension * * Set of edge-triggered events on which a thread may wait for any * one of. */ struct suspension { kmutex_t s_lock; kcondvar_t s_cv; struct event *s_event; int s_flags; }; /* * struct event * * State for an edge-triggered event that a thread may be waiting * for. */ struct event { struct suspend *e_suspension; }; /* * suspension_init(S, wmesg, flags) * * Prepare for the current thread to wait for any of a collection * of events. Caller must have exclusive access to S. */ void suspension_init(struct suspension *S, char *wmesg, int flags) { mutex_init(&S->s_lock, MUTEX_DEFAULT, IPL_SCHED); /* XXX IPL_SCHED? */ cv_init(&S->s_cv, wmesg); S->s_event = NULL; S->s_flags = flags; } /* * suspension_destroy(S) * * Done with a suspension. */ void suspension_destroy(struct suspension *S) { cv_destroy(&S->s_cv); mutex_destroy(&S->s_lock); } /* * suspend(S) * * Wait for any of the events to which S has subscribed. */ struct event * suspend(struct suspension *S) { struct event *E; mutex_enter(&S->s_lock); while ((E = S->s_event) == NULL) cv_wait(&S->s_cv, &S->s_lock); mutex_exit(&S->s_lock); return E; } /* * event_subscribe(E, S) * * Arrange that event_notify(E) will wake suspend(S). */ void event_subscribe(struct event *E, struct suspension *S) { E->e_suspension = S; } /* * event_done(E, S) * * Done with the event E with suspension S. */ void event_done(struct event *E, struct suspension *S) { KASSERT(E->e_suspension == S); /* ...? */ } /* * event_notify(E) * * If E was initialized with event_subscribe(E, S) cause any * pending call suspend(S) to complete. Return true if this was * the first event_notify for S, false if not. Thus the notifier * can learn whether its event was the one consumed by S or not, * e.g. if it was to pass a scarce resource to S. * * XXX What happens if the caller had not yet called suspend(S), * and was about to try to allocate a different scarce resource * instead, e.g. thread A is in pool_get and about to vmem_alloc, * but thread B calls pool_put. Should we avoid the vmem_alloc? * In Concurrent ML, there are two passes over every source of * events: one to poll, and one to set up blocking. Does this * matter? * * Yes, this matters: event_notify _cannot_ report consumption * (e.g., of a pool_put) if the thread that would have suspended * is about to consume something else (e.g., vmem_alloc) instead. */ bool event_notify(struct event *E) { struct suspend *S = E->e_suspension; bool consumed; mutex_enter(&S->s_lock); if (S->s_event == NULL) { S->s_event = E; consumed = true; } else { consumed = false; } cv_signal(&S->s_suspension); mutex_exit(&S->s_lock); return consumed; } /* pool example: if pool_get is waiting, pool_put hands off to it */ struct pool { ... /* XXX Priority queue, ordered by thread priority? */ TAILQ_HEAD(, pool_event) pr_waiters; TAILQ_HEAD(, pool_event) pr_donewaiters; ... }; struct pool_event { struct event pe_event; TAILQ_ENTRY(pool_event) pe_entry; void *pe_obj; }; void * pool_get(struct pool *pp, int flags) { struct suspension suspension, *S; void *obj = NULL; if (ISSET(flags, PR_WAITOK)) { S = &suspension; suspension_init(S, pp->pr_wchan, 0); } else { S = NULL; } mutex_enter(&pp->pr_lock); startover: ... if ((ph = pp->pr_curpage) == NULL) { struct pool_event pe; int error; if (ISSET(flags, PR_WAITOK)) pool_event_subscribe(pp, &pe, S); error = pool_grow(pp, flags, S); if (ISSET(flags, PR_WAITOK)) { if ((obj = pool_event_done(pe, &pe, S)) != NULL) { /* * XXX Should we shrink the pool if we * had grown it in this case? */ goto out; } } if (error) { if (pp->pr_curpage != NULL) goto startover; pp->pr_nfail++; goto out; } } ... out: mutex_exit(&pp->pr_lock); if (ISSET(flags, PR_WAITOK)) suspension_destroy(S); return obj; } void pool_put(struct pool *pp, void *obj) { struct pool_event *pe; struct pool_pagelist pq; mutex_enter(&pp->pr_lock); while ((pe = TAILQ_HEAD(&pp->pr_waiters)) != NULL) { TAILQ_REMOVE(pe, pe_entry); TAILQ_INSERT_TAIL(&pp->pr_donewaiters, pe, pe_entry); if (pool_event_notify(&pe->pe_event, obj)) goto out; } LIST_INIT(&pq); pool_do_put(pp, obj, &pq); out: mutex_exit(&pp->pr_lock); if (pe == NULL) pr_pagelist_free(pp, &pq); } static void pool_event_subscribe(struct pool *pp, struct pool_event *pe, struct suspension *S) { static const struct pool_event zero_pe; KASSERT(mutex_owned(&pp->pr_lock)); *pe = zero_pe; TAILQ_INSERT_TAIL(&pp->pr_waiters, pe, pe_entry); event_subscribe(&pe->pe_event, S); } static void * pool_event_done(struct pool *pp, struct pool_event *pe, struct suspension *S) { void *obj; KASSERT(mutex_owned(&pp->pr_lock)); TAILQ_REMOVE(pe, pe_entry); obj = pe->pe_obj; pe->pe_obj = NULL; event_done(&pe->pe_event, S); return obj; } static bool pool_event_notify(struct pool_event *pe, void *obj) { KASSERT(mutex_owned(&pp->pr_lock)); if (!event_notify(&pe->pe_event)) /* * Thread waiting for event has already been notified * of another event and so is no longer interested in * recycling this object. */ return false; /* * Pass this object along to the thread that was waiting for * the event of a free object. */ pe->pe_obj = obj; return true; } /* vmem example: if vmem_alloc must sleep, let others wake caller too */ struct vmem { ... TAILQ_HEAD(, vmem_event) vm_waiters; TAILQ_HEAD(, vmem_event) vm_donewaiters; ... }; struct vmem_event { struct event *vme_event; TAILQ_ENTRY(vmem_event) vme_entry; }; int vmem_xalloc(struct vmem *vm, ..., struct suspension *S) { KASSERT(ISSET(flags, VM_SLEEP) == (S != NULL)); ... if (ISSET(flags, VM_SLEEP)) { struct vmem_event vme; bool retry; vmem_event_subscribe(vm, &vme, S); retry = (suspend(S) == &vme->vme_event); vmem_event_done(vm, &vme); if (retry) goto retry; /* * Something else higher up in the stack happened * (first). Tell the caller ENOMEM here so they know * to use another event instead. */ } fail: ... return ENOMEM; } void vmem_xfree(struct vmem *vm, ...) { ... while ((vme = TAILQ_FIRST(&vm->vm_waiters)) != NULL) { TAILQ_REMOVE(vme, vme_entry); TAILQ_INSERT_TAIL(&vm->vm_donewaiters, vme, vme_entry); vmem_event_notify(vm, vme); } ... } static void vmem_event_subscribe(struct vmem *vm, struct vmem_event *vme, struct suspension *S) { static const struct vmem_event zero_vme; VMEM_ASSERT_LOCKED(vm); *vme = zero_vme; TAILQ_INSERT_TAIL(&vm->vm_events, vme, vme_event); event_subscribe(&vme->vme_event, S); } static void vmem_event_done(struct vmem *vm, struct vmem_event *vme) { VMEM_ASSERT_LOCKED(vm); event_done(vme); TAILQ_REMOVE(vme, vme_event); } static void vmem_event_notify(struct vmem *vm, struct vmem_event *vme) { VMEM_ASSERT_LOCKED(vm); (void)event_notify(&vme->vme_event); } /* uvm */ struct uvm_free_event { struct event ufe_event; PRIOQ_ENTRY(uvm_free_event) ufe_entry; unsigned long ufe_npages; }; PRIOQ_HEAD(, uvm_free_event) uvm_free_waiters; void uvm_free_subscribe(unsigned long npages, struct uvm_free_event *ufe, struct suspension *S) { static const struct uvm_free_event zero_ufe; KASSERT(mutex_owned(&uvm_fpageqlock)); *ufe = zero_ufe; ufe->ufe_npages = npages; PRIOQ_INSERT(&uvm_free_waiters, ufe, ufe_entry); event_subscribe(&ufe->ufe_event, S); uvm_kick_pagedaemon(); } void uvm_wait(unsigned long npages, struct suspension *S) { struct uvm_free_event ufe; mutex_spin_enter(&uvm_fpageqlock); uvm_free_subscribe(npages, &ufe, S); (void)suspend(S); mutex_spin_exit(&uvm_fpageqlock); }