Dando um tempo com Exclusão Mútua
Vamos desacelerar e ver algo mais leve dessa vez, trabalhando com Locks!

Depois do post anterior, é hora de dar uma respirada para apaziguar os ânimos (autocrítica). Do ponto que estamos, acredito que a parte mais fácil é agora implementar o lock que as threads usarão, como exigido pelo P2. Mas afinal, o que é para ser feito? Na página do P2 é solicitado:
Mutual exclusion: What's your plan for implementing mutual exclusion?
Certo, antes de entrar nos meandros, vale fazer uma breve recapitulação. Longe de querer que esse post seja uma referência em sistemas concorrentes, ele não é, há milhares de outros materiais melhores para isso. Mas para garantirmos estar na mesma página nessa discussão, é válido estabelecer alguns pontos.
Temos várias tarefas concorrentes executando no mesmo processador e, por enquanto, acessando a mesma área de endereçamento na memória. Em especial as threads de kernel, elas acessam podem acessar as mesmas variáveis, e devido ao comportamento aleatório que as tarefas executam, a ordem das operações pode alterar o resultado.
Pior, em sistemas verdadeiramente preemptivos (não é o nosso caso. AINDA), é possível que uma tarefa seja interrompida bem no momento do acesso a uma área importante da memória, mas o dado já foi lido para o registrador. A outra tarefa pode manipular essa região de memória, e ao retornar para a tarefa anterior, ela está com um valor desatualizado no registrador e irá processar informações de forma errada.
Acredite em mim, fazer depuração em sistemas concorrentes é um pesadelo, pois é pouco provável que tenhamos algum controle sobre o escalonamento das tarefas, que é feito pelo Sistema Operacional.
Mesmo que aqui NÓS somos o Sistema Operacional (Yay!), ainda assim é uma tarefa difícil, pois em teoria o algoritmo de escalonamento está certo. O que pode estar errado é a forma como você programou suas threads, de forma que elas acessam a memória compartilhada entre elas de forma anárquica.
E olha que legal, mesmo estando trabalhando com um kernel não preemptivo neste post, ainda assim precisamos de primitiva de sincronização para podermos forçar determinada ordem de execução das threads.
A forma como você implementa exclusão mútua para evitar condições de corrida no kernel do Linux, por exemplo, dependem de quanto tempo você vai passar na região crítica. Para curtíssimos tempos, um spinlock é suficiente. Se você levar um tempo considerável, talvez seja interessante bloquear a thread e colocá-la para dormir, até que o lock seja resolvido.
Vamos retomar alguns conceitos primeiro. Não seremos aderentes a conceitos e nomenclaturas científicas, esse não é o objetivo. O objetivo aqui é dar uma ideia do problema e como podemos resolver para o nosso caso, e talvez conhecer alguns outros mais.
Região Crítica
A região crítica é aquela parte do código ou área de memória em que devem ser acessadas exclusivamente, em um determinado tempo, por uma tarefa, e somente por ela. É uma arte definir exatamente as regiões críticas de um código e sua extensão, pois se mais de uma tarefa está tentando acessar a região crítica, se ela for longa demais, as outras tarefas vão ficar esperando.
No livro Linux Kernel Development, Robert Love é claro em dizer que você deve proteger os dados (áreas de memória compartilhada) e não os fluxos de execução, ou seja, seu código. Tenha isso em mente ao projetar as regiões críticas de seu código
Condição de Corrida
Se mais de uma thread conseguir acessar uma região crítica, elas estarão em condição de corrida. Desse ponto em diante, é difícil prever o comportamento das tarefas, afinal, elas não deveriam conseguir acessar a região crítica ao mesmo tempo. É seguro dizer que se duas tarefas estão em condição de corrida, o comportamento delas será indeterminado. Fuja de condições de corrida como o diabo foge da cruz!
Exclusão Mútua
Exclusão mútua são as premissas e propriedades que você deve garantir para que duas tarefas não entrem em condição de corrida em uma região crítica. Para quem é de banco de dados, lembrará do ACID e de serialização de transações conflitantes, exatamente para evitar que elas rodem ao mesmo tempo.
E eu com isso?
Como dito anteriormente, há algumas formas de se implementar exclusão mútua, e em geral isso depende de suporte do hardware.
Desabilitar interrupções
Para sistemas preemptivos que são interrompidos pelo PIT (Programmable interval timer), o código abaixo pode funcionar (contém pegadinha):
cli
mov eax, [critical_region]
inc eax
mov [critical_region], eax
sti
O código acima recupera um valor da memória, incrementa-o e devolve para a memória. Ao desabilitar as interrupções (e reabilitar logo em seguida), garantimos que o PIT não nos incomodará e esse código executará completamente nesse processador…
… nesse processador é importante, porque esse é justamente o problema desse código. As instruções cli
/sti
só desabilitam as interrupções para o processador em que esse código está executando. Nada impede que outro processador acesse [critical_region]
nesse momento, e ai sua exclusão mútua não é garantida.
Claro que há casos de uso disso. Há estruturas de dados que notadamente são acessíveis apenas por um processador e nesses casos, essa solução funciona bem.
No nosso caso, sinto dizer, isso não vai funcionar. Estamos usando apenas um processador, ok, mas as interrupções já estão desabilitadas, e não as reativaremos enquanto não instalarmos o ISR (Interruption Service Routine), que só acontecerá no próximo trabalho. No nosso caso, a tarefa adquire o lock para entrar na região crítica e cede o processador no meio do processo. Desabilitar interrupções em um kernel não preemptivo não muda nada no fim das contas.
Operações atômicas
Se você notou bem, o problema todo do exemplo anterior é porque a operação toda na verdade levou três instruções: uma para trazer o dado da memória para o registrador, outra para incrementar o registrador e a última para devolver o dado para a memória. Se houvesse uma forma de fazer tudo isso de uma vez de forma garantida, já teríamos meio caminho andado.
De forma conceitual, há previsão de que o hardware forneça algum tipo de instrução Test and Set. Essa instrução deve setar o valor da memória, em geral para o valor 1, e devolve o valor antigo. E como isso é útil para garantir a Exclusão Mútua, você me pergunta? Bom, podemos implementar um Spinlock com isso.
Spinlock
A utilíssima página do OSDev tem um exemplo bem legal de um Spinlock, que chupinharei (afinal eu estou dando os devidos créditos). Lá vai:
#include <stdatomic.h>
void acquire_mutex(atomic_flag* mutex)
{
while(atomic_flag_test_and_set(mutex))
{
__builtin_ia32_pause();
}
}
void release_mutex(atomic_flag* mutex)
{
atomic_flag_clear(mutex);
}
E como isso funciona? Bom, partiremos da premissa que o mutex é um valor booleano inicialmente definido como falso.
A primeira tarefa tentará adquirir o lock, invocando a função acquire_mutex
. Essa, por sua vez, chama a função atomic_flag_test_and_set
, que define o novo valor de mutex para verdadeiro, e retorna o valor antigo (falso). Com falso, o loop while nem inicia. Pronto, a primeira tarefa “adquiriu” o lock.
As próximas tarefas farão a mesma coisa. Mas quando eles invocarem a função atomic_flag_test_and_set
, ela novamente irá definir o mutex para verdadeiro, mas isso é irrelevante. O que importa mesmo é que a função retorna, atomicamente, o valor definido pela primeira tarefa, que é verdadeiro no caso. Com isso, a condição do laço while
se torna verdadeira, e essa tarefa ficará rodando no laço (por isso spinlock. Sacou? Sacou?) até que a primeira tarefa chame a função release_mutex
, que atomicamente irá definir o lock como falso. Agora, dentro da condição do laço while
, a segunda tarefa novamente chamará a função atomic_flag_test_and_set
, que setará o mutex
como verdadeiro (é o que ela está tentando fazer desde o início!!) mas o valor retornado será falso, já que a primeira tarefa já liberou o lock.
Note, contudo, que esse tipo de spinlock não garante ordem. Se mais de uma tarefa estiver esperando pelo lock, a primeira escalonada logo após a liberação será a próxima a adquirir o lock, independente da ordem que as tarefas chegaram a esse ponto (estou considerando um kernel preemptivo em que as tarefas podem ser interrompidas em qualquer momento). Em geral, para tarefas que não possuem ordem de execução, essa solução é satisfatória. Se a ordem é importante, essa solução não pode ser usada dessa forma.
Outra consideração é que se a tarefa que estiver bloqueando as outras levar muito tempo para liberar o lock, há desperdício de CPU das tarefas bloqueadas rodando no loop em vão.
Para encerrar esse tópico, na arquitetura x86 não temos bem uma instrução Test and Set, mas temos uma instrução que faz algo parecido atomicamente: a instrução xchg. Essa instrução troca dois operandos de forma atômica. Portanto, se um dos operandos for 1 (nosso valor verdadeiro do spinlock), o outro é o valor antigo que deve ser testado. Funciona da mesma forma.
Para um kernel não-preemptivo, contudo, não podemos usar essa solução. Afinal, ninguém vai tirar a tarefa de execução se ela mesmo não o fizer usando usando yield. Vamos então para a próxima solução, que é a usada no nosso kernel.
Filas de tarefas bloqueadas
O melhor exemplo é a prática, então vamos aprender fazendo. O primeiro passo é adicionar um estado dentre os possíveis por nosso PCB
: BLOCKED
. Na classe PCB, adicionamos o método block
abaixo:
...
enum TASK_STATUS { READY, RUNNING, EXITED, BLOCKED };
class PCB {
...
void block() { status = BLOCKED; };
...
Espero que o código acima seja autoexplicativo. Agora, adicionamos a nova classe no arquivo kernel.h
:
...
#include <atomic>
...
class Lock {
std::atomic_bool mutex{false};
Datastructure::QueueNode<PCB> *blocked{nullptr};
public:
void lock_acquire();
void lock_release();
void block(Datastructure::QueueNode<PCB> *task) {
...
if (blocked == nullptr) {
blocked = task;
} else {
blocked->insert(task);
}
}
Datastructure::QueueNode<PCB> *unblock() {
Datastructure::QueueNode<PCB> *tmp = blocked;
...
this->blocked = blocked->remove(tmp);
return tmp;
}
};
Aqui, demos aquela roubada básica e usamos o tipo atomic_bool
do namespace std
, padrão do compilador. Como a própria página do OSDev informa:
If you are using GCC or Clang, you have access to the freestanding C11 header stdatomic.h, which can be used to implement a spinlock.
Então maravilha, estamos em um ambiente freestanding (sem stdlib
), e ainda assim nosso código vai funcionar. Quando chegar o ponto exato, eu mostro o código assembly gerado pelo compilador, matando assim a cobra e mostrando a cobra, como diria alguns. Não achei que mostrar o pau fosse apropriado, se contenha.
Note também que temos uma fila de PCB
chamada blocked
. Espero que o nome seja explicativo o suficiente, é isso mesmo que você está pensando. Quando uma tarefa é bloqueada por não obter o lock, ele vem para essa fila aqui.
Mas que blasfêmia! Quer dizer que o escalonador não tem controle sobre os processos bloqueados pelos locks? Quem tem gerência sobre isso. Então, pequeno gafanhoto, é isso ai mesmo. Quem tem controle sobre os processos bloqueados por não se obter o lock é a própria classe Lock
.
Os métodos block
e unblock
apenas manipulam a fila de tarefas bloqueadas por esse lock, seja inserindo-a na fila ou devolvendo a primeira tarefa “desbloqueada”. Note já que aqui, a ordem de aquisição de locks é preservada. We’ve halfway there.
Vamos ver os outros dois métodos, que estão no arquivo lock.cpp:
#include "include/kernel.h"
#include "include/util.h"
#include <atomic>
extern Scheduler sched;
void Lock::lock_acquire() {
if (this->mutex.exchange(true)) {
sched.block(this);
}
}
...
Parece simples. A primeira coisa diferente de outros lugares que você verá por ai é que outras soluções chamam um método chamado lock_init
(o próprio P2 prevê isso, vá lá checar!). No nosso caso, não precisamos, afinal estamos usando C++, programação orientada a objetos (COF COF) e, bem, o construtor já fez isso pra gente:
std::atomic_bool mutex{false};
Indo para as funções de aquisição e liberação do lock, perceba que ela é muito parecida com a do spinlock. A diferença no lock_acquire
é que apenas um if
é suficiente. Se o mutex já tiver sido adquirido por outra tarefa, solicitamos ao escalonador que bloqueie a tarefa atual. O this
do sched.block(this)
não é a tarefa, observe a classe do método em questão. Na verdade solicitamos ao escalonador que bloqueie a tarefa atual e a insira na fila de bloqueados deste Lock(this
).
E o que o block do escalonador faz? Vamos olhar:
void Scheduler::block(Lock *lock) {
current_task->get().block();
lock->block(current_task);
// Util::printk("Blocking task %d\n", current_task->get().get_pid() + 1);
scheduler_entry();
}
Ele altera o estado da tarefa atual e a insere na fila de bloqueados desse lock. Após isso, a tarefa atual não tem mais condições de continuar. Por isso, o escalonador invoca a scheduler_entry
, que como vimos anteriormente, invoca a função scheduler
, que é quem escolhe a nova tarefa para execução. A tarefa bloqueada, quando liberada, continua a execução de onde parou, ou seja, a linha logo após sched.block(this);
da função Lock::lock_acquire
. Como não há nada mais ali para ser executado, essa função só retorna e a tarefa continua executando normalmente.
E a função atômica que você me prometeu hein, você está me enrolando! Calma, pequeno padawan, se colocarmos um breakpoint na linha this->mutex.exchange(true):
Há! olha ali na memória 0x1dc8
a instrução xchg
, cqd.
E quando a tarefa em questão não precisa mais do lock? Ai ela invoca Lock::lock_release
.
...
void Lock::lock_release() {
if (this->blocked != nullptr) {
sched.unblock(this);
} else {
...
this->mutex.store(false);
}
}
Note que se há tarefas bloqueadas na fila, ele desbloqueia a tarefa mas não libera o lock ainda!!! Quem vai fazer isso é a última tarefa da fila de lock, que faz isso de forma atômica na linha this->mutex.store(false);
.
Vamos ver o que ocorre na sched.unblock(this);
.
...
void Scheduler::unblock(Lock *lock) {
Datastructure::QueueNode<PCB> *tmp = lock->unblock();
tmp->get().ready();
// Util::printk("Unblocking task %d\n", tmp->get().get_pid() + 1);
this->resched(tmp);
}
A diferença notável aqui é que primeira tarefa da fila blocked
do Lock
em questão só muda de fila; ela sai da fila do lock e vai para a fila ready_queue
do escalonador. Mas, diferente de lock_acquire
, o escalonador não é invocado! A tarefa que liberou o lock possivelmente liberou outra, mas ela continua a execução normalmente, afinal, ela ainda não fez do_yield
e tampouco do_exit
. É isso.
Juntando a parada toda
O P2 apresenta 4 threads que usam um lock, e eles projetaram de uma forma que só há uma ordem de execução desses comandos. Fiz uma pequena alteração e impressões no original para vermos informações úteis na tela. Nossas threads estão assim:
#include "include/tasks.h"
#include "include/kernel.h"
#include "include/screen.h"
#include "include/util.h"
Lock lock;
int order = 0;
void thread1() {
Screen::set_pos(0, 0);
Util::printk("#1 started (%d) and is trying to acquire the lock\n", order++);
lock.lock_acquire();
Util::printk("#1 acquired the lock and yielded (%d).\n", order++);
do_yield();
Util::printk("#1 woke up (%d).\n", order++);
lock.lock_release();
Util::printk("#1 released the lock and exited (%d).\n", order++);
do_exit();
}
void thread2() {
int thread2_order = 0;
Screen::set_pos(5, 0);
Util::printk("#2 started (%d).\n", order++);
while (true) {
Util::printk(" \n");
Util::printk(" \n");
Screen::set_pos(6, 0);
Util::printk("#2 yielded (t2 %d).\n", thread2_order++);
do_yield();
Util::printk("#2 woke up (t2 %d).\n", thread2_order++);
Screen::set_pos(6, 0);
}
}
void thread3() {
Screen::set_pos(9, 0);
Util::printk("#3 started and yielded (%d).\n", order++);
do_yield();
Util::printk("#3 woke up (%d) and is trying to obtain the lock.\n", order++);
lock.lock_acquire();
Util::printk("#3 acquired the lock (%d).\n", order++);
lock.lock_release();
Util::printk("#3 released the lock and exited (%d).\n", order++);
do_exit();
}
void thread4() {
Screen::set_pos(14, 0);
Util::printk("#4 started (%d) and is trying to obtain the lock.\n", order++);
lock.lock_acquire();
Util::printk("#4 acquired the lock (%d).\n", order++);
lock.lock_release();
Util::printk("#4 released the lock and exited (%d).\n", order++);
do_exit();
}
Um teste de mesa aqui é bem útil para você saber o desenrolar da execução. Não é uma dinâmica tão extensa, e sabendo que as tarefas são inseridas nessa ordem na fila de tarefas prontas, você consegue saber certinho como fica a execução. A tarefa dois atrapalha um pouco pois ela não participa do Lock e fica sendo removida e reinserida na fila de prontos a todo momento. Você pode ignorá-la para este propósito.
De lição de casa, compare a sua execução do teste de mesa com o print abaixo. Os números entre parênteses indicam a ordem dos comandos. Com essa primitiva de sincronização, podemos considerar concluído mais uma etapa do P2, faltando apenas várias VÁRIAS outras, mas quem está reclamando?
Esse código, assim como todos os anteriores, está no meu github.