concurrent - java synchronized variable



Procesamiento secuencial y paralelo. (7)

Tengo un productor y muchos consumidores.

  • El productor es rápido y genera muchos resultados.
  • Los tokens con el mismo valor deben procesarse secuencialmente.
  • Los tokens con diferentes valores deben procesarse en paralelo.
  • crear nuevos Runnables sería muy costoso y también el código de producción podría funcionar con 100k de Tokens (para crear un Runnable tengo que pasarle al constructor algo complejo para construir objetos)

¿Puedo lograr los mismos resultados con un algoritmo más simple? Anidar un bloque de sincronización con un bloqueo de reentrada parece un poco antinatural. ¿Hay alguna condición de raza que puedas notar?

Actualización: una segunda solución que encontré estaba trabajando con 3 colecciones. Uno para cachear los resultados del productor, segundo una cola de bloqueo y tercero usando una lista para rastrear en las tareas en progreso. De nuevo un poco complicado.

Mi version de codigo

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}

https://ffff65535.com


Los tokens con el mismo valor deben procesarse secuencialmente.

La forma de asegurar que ocurran dos cosas en secuencia es hacerlas en el mismo hilo.

Tendría una colección de muchos hilos de trabajadores, y tendría un Mapa. Cada vez que recibo un token que no he visto antes, escogeré un hilo al azar e ingresaré el token y el hilo en el mapa. A partir de entonces, usaré ese mismo hilo para ejecutar tareas asociadas con ese token.

Crear nuevos Runnables sería muy costoso

Runnable es una interfaz. Crear nuevos objetos que implementen Runnable no será significativamente más costoso que crear cualquier otro tipo de objeto.


¿Es todo lo que necesita para asegurarse de que los tokens con el mismo valor no se procesen simultáneamente? Su código es demasiado complicado para entender lo que quiere decir (no se compila y tiene muchas variables, bloqueos y mapas no utilizados, que se crean pero nunca se usan). Parece que estás pensando demasiado en esto. Todo lo que necesita es una cola y un mapa. Algo así me imagino:

   class Consumer implements Runnable {
     ConcurrentHashMap<String, Token> inProcess;
     BlockingQueue<Token> queue;

     public void run() {
        Token token = null;
        while ((token = queue.take()) != null) {
           if(inProcess.putIfAbsent(token.getValue(), token) != null) {
              queue.put(token);
              continue;
           }
           processToken(token);
           inProcess.remove(token.getValue());
        }
     }
   }

La siguiente solución solo utilizará un Mapa único que utilizan el productor y los consumidores para procesar los pedidos en orden secuencial para cada número de pedido mientras procesan diferentes números de pedido en paralelo. Aquí está el código:

public class Main {

    private static final int NUMBER_OF_CONSUMER_THREADS = 10;
    private static volatile int sync = 0;

    public static void main(String[] args) {
        final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
        final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
        final AtomicBoolean done = new AtomicBoolean(false);

        // Create a Producer
        new Thread() {
            {
                this.setDaemon(true);
                this.setName("Producer");
                this.start();
            }

            public void run() {
                Random rand = new Random();

                for(int i =0 ; i < 1000 ; i++) {
                    int order = rand.nextInt(20);
                    String key = String.valueOf(order);
                    String value = String.valueOf(rand.nextInt());
                    Controller controller = queues.get(key);
                    if (controller == null) {
                        controller = new Controller();
                        queues.put(key, controller);
                    }
                    controller.add(new Token(order, value));
                    Main.sync++;
                }

                done.set(true);
            }
        };

        while (queues.size() < 10) {
            try {
                // Allow the producer to generate several entries that need to
                // be processed.
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }

        // System.out.println(queues);

        // Create the Consumers
        ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
        for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
            consumers.submit(new Runnable() {
                private Random rand = new Random();

                public void run() {
                    String name = Thread.currentThread().getName();
                    try {
                        boolean one_last_time = false;
                        while (true) {
                            for (Map.Entry<String, Controller> entry : queues.entrySet()) {
                                Controller controller = entry.getValue();
                                if (controller.lock(this)) {
                                    ConcurrentLinkedQueue<Token> list = controller.getList();
                                    Token token;
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(rand.nextInt(200));
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    int last = Main.sync;
                                    queues.remove(entry.getKey());
                                    while(done.get() == false && last == Main.sync) {
                                        // yield until the producer has added at least another entry
                                        Thread.yield();
                                    }
                                    // Purge any new entries added
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(200);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    controller.unlock(this);
                                }
                            }
                            if (one_last_time) {
                                return;
                            }
                            if (done.get()) {
                                one_last_time = true;
                            }
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumers.shutdown();
        System.out.println("Exiting.. remaining number of entries: " + queues.size());
    }

}

Tenga en cuenta que la clase Main contiene una instancia de colas que es un mapa. La clave del mapa es la identificación del pedido que desea que los consumidores procesen secuencialmente. El valor es una clase de Controlador que contendrá todos los pedidos asociados con ese ID de pedido.

El productor generará los pedidos y agregará el pedido (Token) a su Controlador asociado. Los consumidores repetirán los valores del mapa de colas y llamarán al método de bloqueo del controlador para determinar si puede procesar pedidos para esa identificación de pedido en particular. Si el bloqueo devuelve falso, comprobará la próxima instancia del Controlador. Si el bloqueo es verdadero, procesará todos los pedidos y luego verificará el siguiente Controlador.

actualizado Se agregó el número entero de sincronización que se usa para garantizar que cuando se elimina una instancia del Controlador del mapa de colas. Todas sus entradas serán consumidas. Hubo un error lógico en el código del consumidor donde se llamó al método de desbloqueo pronto.

La clase Token es similar a la que has publicado aquí.

class Token {
    private int order;
    private String value;

    Token(int order, String value) {
        this.order = order;
        this.value = value;
    }

    int getOrder() {
        return order;
    }

    String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Token [order=" + order + ", value=" + value + "]\n";
    }
}

La clase de Controlador que sigue se usa para asegurar que solo un solo hilo dentro del grupo de hilos procesará las órdenes. Los métodos de bloqueo / desbloqueo se utilizan para determinar cuáles de los subprocesos podrán procesar los pedidos.

class Controller {

    private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
    private ReentrantLock lock = new ReentrantLock();
    private Runnable current = null;

    void add(Token token) {
        tokens.add(token);
    }

    public ConcurrentLinkedQueue<Token> getList() {
        return tokens;
    }

    public void unlock(Runnable runnable) {
        lock.lock();
        try {
            if (current == runnable) {
                current = null;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean lock(Runnable runnable) {
        lock.lock();
        try {
            if (current == null) {
                current = runnable;
            }
        } finally {
            lock.unlock();
        }
        return current == runnable;
    }

    @Override
    public String toString() {
        return "Controller [tokens=" + tokens + "]";
    }

}

Información adicional sobre la implementación. Utiliza un CountDownLatch para asegurar que todas las órdenes producidas se procesarán antes de salir del proceso. La variable realizada es igual que su variable STOP_TOKEN.

La implementación contiene un problema que debe resolver. Existe el problema de que no purga el controlador para un ID de pedido cuando se han procesado todos los pedidos. Esto provocará instancias en las que un subproceso en el grupo de subprocesos se asigna a un controlador que no contiene órdenes. Lo que desperdiciará ciclos de cpu que podrían usarse para realizar otras tareas.


No estoy del todo seguro de haber entendido la pregunta, pero intentaré un algoritmo.

Los actores son:

  • Una queue de tareas
  • Un pool de executors libres.
  • Un set de tokens in-process actualmente en proceso
  • Un controller

Entonces,

  • Inicialmente todos los executors están disponibles y el set está vacío.

  • controller elige un executor disponible y recorre la queue busca de una task con un token que no está en el conjunto in-process set y cuando lo encuentra

    • agrega el token al conjunto in-process
    • asigna el executor para procesar la task y
    • vuelve al principio de la cola
  • el executor elimina el token del set cuando termina de procesarse y se agrega a la agrupación


Puede crear previamente un conjunto de Runnables que recogerá las tareas entrantes (tokens) y las colocará en colas de acuerdo con su valor de orden.

Como se señaló en los comentarios, no está garantizado que los tokens con valores diferentes siempre se ejecuten en paralelo (en general, está limitado, al menos, por el número de núcleos físicos en su caja). Sin embargo, se garantiza que las fichas con la misma orden se ejecutarán en el orden de llegada.

Código de muestra:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}

Si tiene muchos tokens diferentes, entonces la solución más simple es crear un número de ejecutores de un solo hilo (aproximadamente 2 veces su número de núcleos), y luego distribuir cada tarea a un ejecutor determinado por el hash de su token.

De esa forma, todas las tareas con el mismo token irán al mismo ejecutor y se ejecutarán de forma secuencial, ya que cada ejecutor solo tiene un subproceso.

Si tiene algunos requisitos no declarados sobre la imparcialidad de la programación, entonces es bastante fácil evitar cualquier desequilibrio significativo haciendo que el subproceso del productor ponga en cola sus solicitudes (o bloque) antes de distribuirlas, hasta que haya, por ejemplo, menos de 10 solicitudes por ejecutor pendientes. .


Una forma de hacer esto es tener un ejecutor para el procesamiento de secuencias y otro para el procesamiento paralelo. También necesitamos un servicio de administrador de un solo hilo que decidirá qué token de servicio debe enviarse para su procesamiento. // Cola para ser compartida por ambos hilos. Contiene los tokens producidos por el productor.
BlockingQueue tokenList = new ArrayBlockingQueue (10);

    private void startProcess() {
    ExecutorService producer = Executors.newSingleThreadExecutor();
    final ExecutorService consumerForSequence = Executors
            .newSingleThreadExecutor();
    final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
    ExecutorService manager = Executors.newSingleThreadExecutor();

    producer.submit(new Producer(tokenList));

    manager.submit(new Runnable() {

        public void run() {
            try {
                while (true) {
                    Token t = tokenList.take();
                    System.out.println("consumed- " + t.orderid
                            + " element");

                    if (t.orderid % 7 == 0) { // any condition to check for sequence processing

                        consumerForSequence.submit(new ConsumerForSequenceProcess(t));

                    } else {

                        ConsumerForParallel.submit(new ConsumerForParallelProcess(t));

                    }
                }
            }

            catch (InterruptedException e) { // TODO Auto-generated catch
                // block
                e.printStackTrace();
            }

        }
    });
}




locking