The example demonstrates the procedure for importing tasks and task properties from Excel.
This example is in the “pycerebro/examples” directory of the server module. This directory also contains a sample file for import: “import.xlsx”
pycerebro/examples/excel_import.py
# -*- coding: utf-8 -*-
"""
Пример импорта задач из Excel.
Этот пример демонстрирует импорт задач и свойств задач из Excel.
Для чтения файлов Excel используется сторонний пакет xlrd (https://pypi.python.org/pypi/xlrd)
Для работы с временными зонами используются сторонние пакеты pytz (https://pypi.python.org/pypi/pytz/) и tzlocal (https://pypi.python.org/pypi/tzlocal)
В модуле используются следующие функции:
do_import - Функция импорта. Принимает параметры: Имя пользователя, Пароль пользователя, Путь до задачи, Путь к файлу Excel.
read - Функция, которая записывает свойства задачи и всех вложенных задач из Excel в Cerebro.
connect_db - Функция для соединения с базой данных Cerebro.
write_info, write_error - Функции для логирования.
Пример использования:
do_import('Имя_пользователя', 'Пароль_пользователя', '/Путь/к/Задаче', 'C:/путь/к/файлу.xlsx')
"""
# Следующие два параметра, хост и порт - это наш главный сервер с базой данных.
# У вас эти параметры могут быть иными, если вы используете свою базу данных
host = 'db.cerebrohq.com'
port = 45432
# Имена колонок Excel
columns = {1: "Описание", 2: "Назначено", 3: "Начало", 4: "Окончание", 5: "Запланировано"}
# Индекс колонки с именем задачи
name_column = 0
# Пропуск 1 строки заголовка
skeep_first_rows = 1
# Разделитель имен задач
tasks_delimeter = '/'
import xlrd
from py_cerebro import database, dbtypes
import datetime
import pytz
import tzlocal
def do_import(db_user, db_password, task, file_name):
"""
Функция импорта.
Параметры db_user и db_password - логин и пароль пользователя Cerebro.
task - тектовый локатор(путь) до задачи, в которую выполняется импорт.
Формат локатора: '/Проект/Задача 1/Задача 2', то есть по сути путь до задачи.
Примечание: Имена задач регистрозависимы!
Пример вызова функции:
::
import excel_import
excel_import.do_import('user', 'password', '/Проект/Задача 1/Задача 2', 'c:/temp/import.xlsx')
::
"""
# Устанавливаем соединение с базой данных
db = connect_db(db_user, db_password)
if (db):
write_info('Connected to db: ' + host)
# Открываем файл Excel
rb = xlrd.open_workbook(file_name)
if (rb):
# получаем первый лист
sheet = rb.sheet_by_index(0)
if (sheet):
# Получаем идентификатор задачи (проекта) в которую осуществляется импорт
task_id = db.task_by_url(task)[0]
if (task_id):
read(db, task_id, sheet)
else:
write_error('Task (Project) not found')
write_info('Import finished!')
else:
write_error('Can not connect to db: ' + host)
def read(db, task_id, sheet):
"""
Функция для чтения данных из Excel и записи данных в Cerebro.
db - переменная для работы с базой данных.
task_id - идентификатор задачи.
sheet - лист Excel.
"""
# Получили дату отсчета
datetime_2000 = datetime.datetime(2000, 1, 1, tzinfo=pytz.utc)
# Построчно читаем файл
for rownum in range(sheet.nrows):
# Пропускаем строку заголовка
if (rownum +1 > skeep_first_rows):
# Получили строку
row = sheet.row_values(rownum)
# Получили идентификатор задачи, в которую осуществляется импорт
new_task_id = task_get_create(db, task_id, row[name_column])
if (new_task_id):
for col in columns:
if ('Описание' == columns[col]):
# Импорт Постановки задачи
db.add_definition(new_task_id, row[col])
elif ('Назначено' == columns[col]):
# Импорт пользователей, нанзаченных на задачу
alloc_ids = set()
# Получили список имен пользователей
user_names = row[col].split('; ')
for user in db.users():
# Если пользователь найден в списке пользователей Cerebro, то добавляем его идентификатор в список
if user[dbtypes.USER_DATA_FULL_NAME] in user_names:
alloc_ids.add(user[dbtypes.USER_DATA_ID])
# Назначаем пользователей на задачу
if (alloc_ids):
db.task_set_allocated(new_task_id, alloc_ids)
elif ('Начало' == columns[col]):
# Импорт даты старта задачи
if (row[col]):
# Получаем количество дней от даты отсчета, подробнее в описании функции task_set_start
date = datetime.datetime.strptime(row[col], '%d.%m.%y %H:%M')
localtz = tzlocal.get_localzone()
utcdate = localtz.localize(date)
timedelta = utcdate - datetime_2000
days = timedelta.total_seconds()/(24*60*60)
# Устанавливаем дату старта задачи
db.task_set_start(new_task_id, days)
elif ('Окончание' == columns[col]):
# Импорт даты окончания
if (row[col]):
# Получаем количество дней от даты отсчета, подробнее в описании функции task_set_start
date = datetime.datetime.strptime(row[col], '%d.%m.%y %H:%M')
localtz = tzlocal.get_localzone()
utcdate = localtz.localize(date)
timedelta = utcdate - datetime_2000
days = timedelta.total_seconds()/(24*60*60)
# Устанавливаем дату окончания
db.task_set_finish(new_task_id, days)
elif ('Запланировано' == columns[col]):
# Импорт запланированных часов
if (row[col]):
db.task_set_planned_time(new_task_id, row[col])
else:
db.task_set_planned_time(new_task_id, 0.016)
def task_get_create(db, parent_task_id, name):
"""
Функция для создания задачи по локатору.
Если задача существует, то функция вернет идентификатор задачи.
db - переменная для работы с базой данных.
parent_task_id - идентификатор задачи, в которой будет создана новая задача.
name - текстовый локатор задачи.
"""
task_id = None
# Формируем полный путь до задачи
path = str(db.task(parent_task_id)[dbtypes.TASK_DATA_PARENT_URL]) + str(db.task(parent_task_id)[dbtypes.TASK_DATA_NAME]) + '/' + name
# Пытаемся получить id задачи
task_id = db.task_by_url(path)[0]
if (not task_id):
# Если задача не существует, создаем ее
if (name.find(tasks_delimeter) != -1):
# Получаем список имен задач
spl_name = name.split(tasks_delimeter)
t_id = parent_task_id
# Создаем дерево задач
for nm in spl_name:
if (nm):
t_id = task_get_create(db, t_id, nm)
# Если хотябы одна задача создана, присваиваем возвращаемой переменной ее идентификатор
if (t_id != parent_task_id):
task_id = t_id
else:
task_id = db.add_task(parent_task_id, name)
return task_id
def connect_db(user, password):
"""
Функция для соединения с базой данных.
user - имя пользователя cerebro.
password - пароль пользователя cerebro.
"""
# Создаем объект базы данных
db = database.Database(host, port)
# Соединяемся с базой данных
db.connect(user, password)
return db
def write_info(text):
"""
Функция для логирования информационных сообщений.
text - текст сообщения.
"""
print('info: ' + text)
def write_error(text):
"""
Функция для логирования ошибок.
text - текст сообщения.
"""
print('error: ' + text)