Problem producentów i konsumentów

Użycie

Przejdź do folderu z kodem źródłowym, następnie:

make
./producers-consumers NO_PRODUCERS NO_CONSUMERS [-d]

Parametr -d - debug pozwala wypisywać szczegółowe informacje o przebiegu algorytmu.

Kod źródłowy

#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <pthread.h>
#include <memory.h>
#include <stdbool.h>
#include <stdint.h>
#include <semaphore.h>
#include <unistd.h>
#include "error.h"

#define RANGE 100 // Range of produced items (integers)
#define BUFFER_SIZE 50
int BUFFER[BUFFER_SIZE];

int NO_PRODUCERS;
int NO_CONSUMERS;
bool DEBUG;

pthread_mutex_t bufferMutex = PTHREAD_MUTEX_INITIALIZER;
sem_t filledSpaceSemaphore;
sem_t leftSpaceSemaphore;

int semaphoreGetVal(sem_t *sem){
    int sval;
    sem_getvalue(sem, &sval);
    return sval;
}

void putItem(int item){
    int index = semaphoreGetVal(&filledSpaceSemaphore);
    if(index > BUFFER_SIZE){throwError("putItem error filledSpaceSemaphore > BUFFER_SIZE");}
    BUFFER[index] = item;
    printf("Producer TID=%ld put item=%i to index=%i.\n", (long) pthread_self(), item, index);
}

void *producerJob(void *arg) {
    while (1) {
        // Produce item
        int item = rand() % RANGE;

        if(DEBUG){
            printf("Producer TID=%ld is trying to decrement leftSpaceSemaphore=%i.\n",
                   (long) pthread_self(), semaphoreGetVal(&leftSpaceSemaphore));
        }
        sem_wait(&leftSpaceSemaphore);
        pthread_mutex_lock(&bufferMutex);

        // Put item into buffer
        putItem(item);

        pthread_mutex_unlock(&bufferMutex);
        sem_post(&filledSpaceSemaphore);
    }
    return 0;
}

int removeItem(){
    int indexToRemove = semaphoreGetVal(&filledSpaceSemaphore);
    if(indexToRemove < 0){throwError("removeItem error filledSpaceSemaphore < 0");}
    int item = BUFFER[indexToRemove];
    printf("Consumer TID=%ld removed item=%i from index=%i.\n", (long) pthread_self(), item, indexToRemove);
    return item;
}

void *consumerJob(void *arg) {
    while (1) {
        if(DEBUG){
            printf("Consumer TID=%ld is trying to decrement filledSpaceSemaphore=%i.\n",
                   (long) pthread_self(), semaphoreGetVal(&filledSpaceSemaphore));
        }
        sem_wait(&filledSpaceSemaphore);
        pthread_mutex_lock(&bufferMutex);

        // Remove item from buffer
        int item = removeItem();

        pthread_mutex_unlock(&bufferMutex);
        sem_post(&leftSpaceSemaphore);

        // Consume item
        item += item;
    }
    return 0;
}

void parseArguments(int argc, char *argv[], int minNumberOfArguments,
                    int *noProducers, int *noConsumers, bool *debug) {
    if (argc < minNumberOfArguments + 1) {
        throwError(
                "Not enough arguments. Usage: ./main NO_PRODUCERS NO_CONSUMERS [-d]");
    }
    int currentArgument = 1;
    (*noProducers) = atoi(argv[currentArgument++]);
    (*noConsumers) = atoi(argv[currentArgument++]);
    if (argc == minNumberOfArguments + 2 && strcmp(argv[currentArgument++], "-d") == 0) {
        (*debug) = true;
    }
}

int main(int argc, char *argv[]) {
    parseArguments(argc, argv, 2, &NO_PRODUCERS, &NO_CONSUMERS, &DEBUG);
    printf("Producers-consumers problem.\n");
    printf("    NO_PRODUCERS=%i, NO_CONSUMERS=%i, DEBUG=%i\n\n",
           NO_PRODUCERS, NO_CONSUMERS, DEBUG);

    // Initialize semaphores
    if (sem_init(&filledSpaceSemaphore, 0, 0)) throwError("sem_init error");
    if (sem_init(&leftSpaceSemaphore, 0, BUFFER_SIZE)) throwError("sem_init error");

    // Initialize arrays of threads IDs
    pthread_t *producersThreadsIds = malloc(NO_PRODUCERS * sizeof(pthread_t));
    if (producersThreadsIds == NULL) { cannotAllocateMemoryError(); }
    pthread_t *consumersThreadsIds = malloc(NO_PRODUCERS * sizeof(pthread_t));
    if (consumersThreadsIds == NULL) { cannotAllocateMemoryError(); }

    // Create producers threads
    for (int i = 0; i < NO_PRODUCERS; ++i) {
        if (pthread_create(&producersThreadsIds[i], NULL, producerJob, NULL) != 0) {
            throwError("pthread_create error");
        }
    }
    // Create consumers threads
    for (int i = 0; i < NO_CONSUMERS; ++i) {
        if (pthread_create(&consumersThreadsIds[i], NULL, consumerJob, NULL) != 0) {
            throwError("pthread_create error");
        }
    }

    // Wait for producers to finish
    for (int i = 0; i < NO_PRODUCERS; ++i) {
        if (pthread_join(producersThreadsIds[i], NULL) != 0) {
            throwError("pthread_join error");
        }
    }
    // Wait for consumers to finish
    for (int i = 0; i < NO_CONSUMERS; ++i) {
        if (pthread_join(consumersThreadsIds[i], NULL) != 0) {
            throwError("pthread_join error");
        }
    }

    free(producersThreadsIds);
    free(consumersThreadsIds);
    pthread_mutex_destroy(&bufferMutex);
    sem_destroy(&filledSpaceSemaphore);
    sem_destroy(&leftSpaceSemaphore);

    return 0;
}