1С Kafka быстрый старт обмена данными.
Задача публикации, реализовать крайне простой и упрощенный вариант обмена фотографиями, между приложением на мобильной платформе 1С, брокером сообщений Apache Kafka и клиент-серверным приложением 1С. Для взаимодействия с Apache Kafka, используется Kafka REST Proxy.
Описание работы информационной системы «1С Kafka»:
- В приложении для мобильной платформы 1С, выполняется фотосъемка с дальнейшей отправкой, закодированных «Base64» двоичных данных в Apache Kafka используя Kafka REST Proxy.
- В клиент-серверном приложении, вручную опрашивается Apache Kafka и в случае наличия данных, производится загрузка в регистр сведений.
Примечание:
Рассматриваются не все необходимые действия по настройке информационной системы «1С Kafka», например, не создаются пользователи в ОС Debian (вся работа из-под root), не настраивается брандмауэр, директории и прочие аспекты. Не рассматривается внутренняя структура работы Apache Kafka (темы, разделы и смещения), а также дополнительная настройка конфигурационных файлов. Сделано это с целью «быстрого старта», для более глубокой настройки, в конце публикации, предоставлены все необходимые ссылки на источники информации.
Создание VDS для 1С Kafka
Apache Kafka будет является ядром системы и базироваться на выделенном виртуальном сервере. В этот раз будет использоваться базовый образ на основе операционной системы Debian. Действия по созданию, те же самые что и в статье «Телеграм бот Node.js Webhook», за исключение того, что SSH ключ, будет использоваться ранее созданный. В качестве SSH клиента, будет использоваться программа «MobaXterm». Объем оперативной памяти на сервере, RAM 2 Gb для того чтобы смогла запуститься Java VM и все необходимые компоненты.
Настройка Debian
После создания виртуального сервера из образа и запуска VDS, подключаемся по протоколу SSH, с помощью программы «MobaXterm», использую логин root и пароль высланный на электронную почту.
Выполняем следующие команды, проверка текущей используемой локали:
root@box-669849:~# locale
LANG=en_US.UTF-8
LANGUAGE=
Проверка наличия русской локали:
root@box-669849:~# locale -a | grep 'ru'
ru_RU
ru_RU.cp1251
ru_RU.koi8r
ru_RU.utf8
ru_UA
ru_UA.utf8
Установка русской локали:
root@box-669849:~# export LANG="ru_RU.utf8" root@box-669849:~# update-locale LANG=ru_RU.utf8 root@box-669849:~# cat /etc/default/locale
# File generated by update-locale
LANG=ru_RU.utf8
Далее, необходимо обновить ОС Debian, для этого выполняем команды:
root@box-669849:~# apt update root@box-669849:~# apt upgrade
После необходимо перезагрузится:
root@box-669849:~# reboot
Установка и запуск Apache Kafka
Для работы Apache Kafka необходима Java, достаточно будет Java Runtime Edition (JRE), для этого выполняем следующую команду:
root@box-669849:~# apt install -y default-jre
Проверка установки Java:
root@box-669849:~# java -version
openjdk version "11.0.11" 2021-04-20
OpenJDK Runtime Environment (build 11.0.11+9-post-Debian-1deb10u1)
OpenJDK 64-Bit Server VM (build 11.0.11+9-post-Debian-1deb10u1, mixed mode, sharing)
Проверяем каталог установки Java:
root@box-669849:~# update-alternatives --list java
/usr/lib/jvm/java-11-openjdk-amd64/bin/java
Находясь в текущей директории, скачиваем Apache Kafka:
root@box-669849:~# wget https://mirror.linux-ia64.org/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
Распаковываем архив Apache Kafka:
root@box-669849:~# tar -xzf kafka_2.13-2.8.0.tgz
Будет создана директория kafka_2.13-2.8.0 в которой располагаются необходимые файлы для запуска.
Перед запуском Apache Kafka, необходимо запустить Apache ZooKeeper – который используется для хранения метаданных о кластере Kafka, для этого переходим в папку:
root@box-669849:~# cd kafka_2.13-2.8.0/bin/
Запускаем ZooKeeper с конфигурационным файлом zookeeper.properties:
root@box-669849:~/kafka_2.13-2.8.0/bin# ./zookeeper-server-start.sh ../config/zookeeper.properties
В новой вкладке, создаем новое подключение к серверу, переходим в директорию «/kafka_2.13-2.8.0/bin» и запускаем Apache Kafka:
root@box-669849:~/kafka_2.13-2.8.0/bin# ./kafka-server-start.sh ../config/server.properties
Создаем новое подключение к серверу, переходим в директорию «/kafka_2.13-2.8.0/bin» и создаем тему «SendingPhotos» в Apache Kafka:
root@box-669849:~/kafka_2.13-2.8.0/bin# ./kafka-topics.sh --create --topic SendingPhotos --bootstrap-server localhost:9092
Created topic SendingPhotos.
Установка и запуск Kafka REST Proxy
Kafka REST Proxy — это интерфейс RESTful для кластера Apache Kafka, упрощает создание и использование сообщений, просмотр состояния кластера и выполнение административных действий без использования собственного протокола Kafka или клиентов.
Скачиваем Kafka REST Proxy в ту же директорию что и Apache Kafka:
wget http://packages.confluent.io/archive/6.2/confluent-6.2.0.tar.gz
Распаковываем архив Kafka REST Proxy:
root@box-669849:~# tar -xzf confluent-6.2.0.tar.gz
Переходим в директорию bin:
root@box-669849:~# cd confluent-6.2.0/bin
Запускаем Kafka REST Proxy:
root@box-669849:~/confluent-6.2.0/bin# ./kafka-rest-start ../etc/kafka-rest/kafka-rest.properties
Проверим подключение и работу Kafka REST Proxy с помощью программы Postman, для этого получим список текущих тем, сформировав GET запрос следующего вида:
В результате, получаем список тем, в данном случае, только ранее созданная тема «SendingPhotos».
Создание приложения для мобильной платформы 1С
Следующий шаг, создание простого мобильного приложения, которое будет отправлять фотографии в Apache Kafka. Для этого, на локальном компьютере должен быть установлен веб-сервер Apache, а на телефоне/планшете мобильная платформа 1С. Более подробно процесс разработки и установки приложения на мобильную платформу, описан в публикации «Разработка мобильных приложений 1С».
Открываем конфигуратор и создаем пустую конфигурацию, меняем настройку «Назначения использования» на «Приложение для мобильной платформы» и включаем опцию «Камера» в «Используемая функциональность мобильного приложения»:
Устанавливаем в конфигурацию «Коннектор: удобный HTTP-клиент для 1С:Предприятие 8» от Владимира Бондаревского, как это сделать подробно описано в публикации «Телеграм бот 1С».
Создаем общую форму «ОтправкаФотографий» и располагаем ее в рабочей области начальной страницы:
Примечание:
Для простоты разработки, используется указание адресов прямо в коде конфигурации, так называемый «хардкод», использовать который можно лишь в тестовых системах.
На форму добавляем реквизит «ТекущаяФотография» с типом «Строка» и видом «Поле картинки», две команды «СделатьФото» и «ОтправитьФото», а также обработчики для этих команд.
Код обработчика «Сделать фото»:
&НаКлиенте Процедура СделатьФото(Команда) #Если МобильноеПриложениеКлиент Тогда Если НЕ СредстваМультимедиа.ПоддерживаетсяФотоснимок(ТипКамерыУстройства.Задняя) Тогда Сообщение = Новый СообщениеПользователю(); Сообщение.Текст = "Устройство не поддерживает фото"; Сообщение.Сообщить(); Возврат; КонецЕсли; РазрешениеФото = Новый РазрешениеКамерыУстройства(640, 480); ДанныеМультимедиа = СредстваМультимедиа.СделатьФотоснимок(ТипКамерыУстройства.Задняя, РазрешениеФото); Если ДанныеМультимедиа = Неопределено Тогда Сообщение = Новый СообщениеПользователю(); Сообщение.Текст = "Не удалось выполнить фото"; Сообщение.Сообщить(); Возврат; КонецЕсли; ДвоичныеДанныеФото = ДанныеМультимедиа.ПолучитьДвоичныеДанные(); ТекущаяФотография = ПоместитьВоВременноеХранилище(ДвоичныеДанныеФото, УникальныйИдентификатор); #КонецЕсли КонецПроцедуры
Код обработчика «Отправить фото»:
&НаСервере Процедура ОтправитьФотоНаСервере() Если НЕ ЭтоАдресВременногоХранилища(ТекущаяФотография) Тогда Сообщение = Новый СообщениеПользователю(); Сообщение.Текст = "Сделайте фото, после выполните отправку"; Сообщение.Сообщить(); Возврат; КонецЕсли; ДвоичныеДанныеФото = ПолучитьИзВременногоХранилища(ТекущаяФотография); ЗаписьТемы = Новый Структура("records", Новый Массив); ДанныеФото = Новый Структура("value", Base64Строка(ДвоичныеДанныеФото)); ЗаписьТемы["records"].Добавить(ДанныеФото); ЗаписьТемыJSON = КоннекторHTTP.ОбъектВJson(ЗаписьТемы); Заголовки = Новый Соответствие; Заголовки.Вставить("Content-Type", "application/vnd.kafka.json.v2+json"); Результат = КоннекторHTTP.Post("http://185.185.70.123:8082/topics/SendingPhotos", ЗаписьТемыJSON, Новый Структура("Заголовки", Заголовки)); Если Результат.КодСостояния = 200 Тогда Сообщение = Новый СообщениеПользователю(); Сообщение.Текст = "Фото отправлено"; Сообщение.Сообщить(); Иначе Сообщение = Новый СообщениеПользователю(); Сообщение.Текст = "Ошибка отправки фото, см. лог файлы."; Сообщение.Сообщить(); КонецЕсли; КонецПроцедуры
Необходимо опубликовать приложения для мобильной платформы на локальном компьютере, более подробно процесс описан в публикации «Разработка мобильных приложений 1С».
Устанавливаем приложение на устройство в мобильную платформу и производим тестирование, для этого запускаем приложение в мобильной платформе 1С, делаем и отправляем фотографию. В случае успешной отправки, возвратится код 200 и номер смещения в разделе темы «SendingPhotos». Проверить можно в выводе Kafka REST Proxy:
Создание конфигурации загрузки фото из 1С Kafka
Заключительным шагом, будет создание конфигурации 1С, для клиент-серверного варианта, целью которой является загрузка фотографий из Apache Kafka используя Kafka REST Proxy.
Для этого создается пустая конфигурация, в которую необходимо добавить «Коннектор: удобный HTTP-клиент для 1С:Предприятие 8» от Владимира Бондаревского, как это сделать подробно описано в публикации «Телеграм бот 1С».
Для хранения полученных из 1С Kafka фотографий, будет использоваться простой регистр сведений «Фотографии», следующей структуры:
Далее создаем общую форму и размещаем ее в рабочей области начальной страницы. На форме создается команда «ПолучитьФотографии» и динамический список, где в качестве основной таблицы, используется регистр сведений «Фотографии».
Код команды «Получить фотографии»:
&НаСервере Процедура ПолучитьФотографииНаСервере() // Создание подписчика Заголовки = Новый Соответствие; Заголовки.Вставить("Content-Type", "application/vnd.kafka.v2+json"); Заголовки = Новый Структура("Заголовки", Заголовки); ПараметрыПодписчика = Новый Структура; ПараметрыПодписчика.Вставить("name", "ConsumerInstance_" + Новый УникальныйИдентификатор()); ПараметрыПодписчика.Вставить("format", "json"); ПараметрыПодписчика.Вставить("auto_offset_reset", "earliest"); ПараметрыПодписчика = КоннекторHTTP.ОбъектВJson(ПараметрыПодписчика); ПараметрыПодписчика = СтрЗаменить(ПараметрыПодписчика, "auto_offset_reset", "auto.offset.reset"); РезультатСоздания = КоннекторHTTP.Post("http://185.185.70.123:8082/consumers/MainConsumerGroup", ПараметрыПодписчика, Заголовки); Если НЕ РезультатСоздания.КодСостояния = 200 Тогда ЗаписьЖурналаРегистрации("Kafka", УровеньЖурналаРегистрации.Ошибка, , , "Ошибка создания подписчика"); Возврат; КонецЕсли; БазовыйАдрес = КоннекторHTTP.КакJson(РезультатСоздания)["base_uri"]; // Подписка на тему ТемаПодписки = Новый Массив(); ТемаПодписки.Добавить("SendingPhotos"); ПараметрПодписки = Новый Структура; ПараметрПодписки.Вставить("topics", ТемаПодписки); ПараметрПодписки = КоннекторHTTP.ОбъектВJson(ПараметрПодписки); Заголовки = Новый Соответствие; Заголовки.Вставить("Content-Type", "application/vnd.kafka.json.v2+json"); Заголовки = Новый Структура("Заголовки", Заголовки); РезультатПодписки = КоннекторHTTP.Post(БазовыйАдрес + "/subscription", ПараметрПодписки, Заголовки); // Получение данных ЗаголовкиСериализации = Новый Соответствие; ЗаголовкиСериализации.Вставить("Accept", "application/vnd.kafka.json.v2+json"); ЗаголовкиСериализации = Новый Структура("Заголовки", ЗаголовкиСериализации); ПолученныеДанные = КоннекторHTTP.Get(БазовыйАдрес + "/records", , ЗаголовкиСериализации); Если ПолученныеДанные.КодСостояния = 200 Тогда ПолученныеДанные = КоннекторHTTP.КакJson(ПолученныеДанные); Если ПолученныеДанные.Количество() > 0 Тогда ДатаСобытия = ТекущаяДата(); НаборЗаписей = РегистрыСведений.Фотографии.СоздатьНаборЗаписей(); НаборЗаписей.Отбор.ДатаСобытия.Установить(ДатаСобытия); Для каждого ЗаписьДанных Из ПолученныеДанные Цикл ЗаписьНабора = НаборЗаписей.Добавить(); ЗаписьНабора.ДатаСобытия = ДатаСобытия; ЗаписьНабора.Раздел = ЗаписьДанных["partition"]; ЗаписьНабора.Смещение = ЗаписьДанных["offset"]; ЗаписьНабора.Ключ = ЗаписьДанных["key"]; ЗаписьНабора.Тема = ЗаписьДанных["topic"]; ЗаписьНабора.Данные = ЗаписьДанных["value"]; КонецЦикла; НаборЗаписей.Записать(); Элементы.Фотографии.Обновить(); КонецЕсли; КонецЕсли; // Удаление подписчика КоннекторHTTP.Delete(БазовыйАдрес, , Заголовки); КонецПроцедуры
Для регистра сведений, создана форма записи, с целью показа изображения, код формы:
&НаСервере Процедура ПриСозданииНаСервере(Отказ, СтандартнаяОбработка) ДвоичныеДанныеФото = Base64Значение(Запись.Данные); Фотография = ПоместитьВоВременноеХранилище(ДвоичныеДанныеФото, УникальныйИдентификатор) КонецПроцедуры
Теперь можно проводить полноценное тестирование информационной системы «1С Kafka»:
💾 Конфигурация мобильного приложения «Отправка фотографий»
💾 Конфигурация клиентского приложения «Загрузка фотографий»
Источники информации:
- How To Install Java with Apt on Debian 10
- Установка Apache Kafka в Debian 10
- 1С kafka. Не пишите свой REST сервис для кафки на питоне
- Apache Kafka, открытый базовый курс
- Про Kafka (основы)
- Книга «Apache Kafka. Потоковая обработка и анализ данных», авторы: Ния Нархид, Гвен Шапира, Тодд Палино
Судя по всему, у вас дефолтом включен enable.auto.commit = true у инстанса консьюмера.
Сие не очень хорошо.
Любое падение 1С при обработке фотографий — данные будут навсегда утеряны.
Выключить бы эту опцию, обработку в Попытку-Исключение, в конце фиксацию офсета на максимальный обработанный.
Константин опечатка в строке запуска REST ‘kafla-rest’
Запускаем Kafka REST Proxy:
root@box-669849:~/confluent-6.2.0/bin# ./kafka-rest-start ../etc/kafla-rest/kafka-rest.properties
Спасибо, исправлено.