#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This code is mostly PEP8-compliant. See
# http://www.python.org/dev/peps/pep-0008.
import multiprocessing
import time
import os
import os.path
import re
import xml.dom.minidom
import socket
import wave
from datetime import datetime
from collections import deque
from alex.utils.mproc import etime
from alex.utils.exdec import catch_ioerror
from alex.utils.exceptions import SessionLoggerException, SessionClosedException
from alex.utils.procname import set_proc_name
[docs]class SessionLogger(multiprocessing.Process):
"""
This is a multiprocessing-safe logger. It should be used by Alex to log
information according the SDC 2010 XML format.
Date and times should also include time zone.
Times should be in seconds from the beginning of the dialogue.
"""
def __init__(self):
multiprocessing.Process.__init__(self)
self._session_dir_name = ''
self._session_start_time = time.time()
self._is_open = False # whether the session is started
self._doc = None
# filename of the started recording
self._rec_started = {}
self.queue = multiprocessing.Queue()
self._queue = deque()
[docs] def set_close_event(self, close_event):
self.close_event = close_event
[docs] def set_cfg(self, cfg):
self.cfg = cfg
[docs] def cancel_join_thread(self):
self.queue.cancel_join_thread()
def __repr__(self):
return "SessionLogger()"
def __getattr__(self, key):
"""Queue all method calls for methods not known, Later the process will try to call these functions
asynchronously.
"""
@etime('SessionLoggerQueue: '+key)
def queue(*args, **kw):
# print "Queueing a call", key, args, kw
self.queue.put((key, args, kw, time.time()))
return queue
def _get_date_str(self):
""" Return current time in ISO format.
It is useful when constructing file and directory names.
"""
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " " + time.tzname[time.localtime().tm_isdst]
return dt
def _get_time_str(self):
""" Return current time in ISO format.
It is useful when constructing file and directory names.
"""
dt = time.time() - self._session_start_time
return "%.3f" % dt
@etime('seslog_session_start')
def _session_start(self, output_dir):
""" Records the target directory and creates the template call log.
"""
self._session_dir_name = output_dir
f = open(os.path.join(self._session_dir_name, 'session.xml'), "w", 0)
f.write("""<?xml version="1.0" encoding="UTF-8"?>
<dialogue>
</dialogue>
""")
f.write('\n')
f.close()
self._session_start_time = time.time()
self._read_session_xml()
self._is_open = True
def _flush(self):
# close all opened rec_started files
for f in self._rec_started:
if self._rec_started[f]:
self._rec_end(f)
@etime('seslog_session_end')
def _session_end(self):
"""
*WARNING: Deprecated* Disables logging into the session-specific directory.
We better do not end a session because very often after the session_end() method is called there are still
incoming messages. Therefore, it is better to wait for the session_start() method to set a
new destination for the session log.
"""
self._flush()
self._write_session_xml()
self._session_dir_name = ''
self._doc = None
self._is_open = False
def _cfg_formatter(self, message):
""" Format the message - pretty print
"""
s = ' ' + unicode(message)
s = re.sub(r'\n', '\n ', s)
s = re.sub(r'--', '- -', s) # XML does not allow -- in comment mode
return s + '\n'
def _read_session_xml(self):
"""Opens the session xml file.
"""
with open(os.path.join(self._session_dir_name, 'session.xml'), "r+", 0) as f:
# fcntl.lockf(self._f, fcntl.LOCK_EX)
self._doc = xml.dom.minidom.parse(f)
# fcntl.lockf(f, fcntl.LOCK_UN)
def _write_session_xml(self):
"""Saves the self._doc self._document into the session xml file.
"""
with open(os.path.join(self._session_dir_name, 'session.xml'), "r+", 0) as f:
# fcntl.lockf(self._f, fcntl.LOCK_EX)
f.seek(0)
f.truncate(0)
x = self._doc.toprettyxml(encoding='utf-8')
for i in range(5):
x = re.sub(r'\n\t*\n', '\n', x)
x = re.sub(r'\n *\n', '\n', x)
# x = re.sub(r'>\n\t*(\w)', r'>\1', x)
x = re.sub(r'\t', ' ', x)
# x = unicode(x, encoding='utf-8')
f.write(x)
# fcntl.lockf(f, fcntl.LOCK_UN)
@etime('seslog_config')
@catch_ioerror
def _config(self, cfg):
""" Adds the config tag to the session log.
"""
els = self._doc.getElementsByTagName("dialogue")
if els:
if els[0].firstChild:
config = els[0].insertBefore(self._doc.createElement("config"), els[0].firstChild)
else:
config = els[0].appendChild(self._doc.createElement("config"))
config.appendChild(self._doc.createComment(self._cfg_formatter(cfg)))
self._write_session_xml()
@etime('seslog_header')
@catch_ioerror
def _header(self, system_txt, version_txt):
""" Adds host, date, system, and version info into the header element.
The host and date will be derived automatically.
"""
els = self._doc.getElementsByTagName("dialogue")
if els:
header = els[0].appendChild(self._doc.createElement("header"))
host = header.appendChild(self._doc.createElement("host"))
host.appendChild(self._doc.createTextNode(socket.gethostname()))
date = header.appendChild(self._doc.createElement("date"))
date.appendChild(self._doc.createTextNode(self._get_date_str()))
system = header.appendChild(self._doc.createElement("system"))
system.appendChild(self._doc.createTextNode(system_txt))
version = header.appendChild(self._doc.createElement("version"))
version.appendChild(self._doc.createTextNode(version_txt))
self._write_session_xml()
@etime('seslog_input_source')
@catch_ioerror
def _input_source(self, input_source):
"""Adds the input_source optional tag to the header."""
els = self._doc.getElementsByTagName("header")
if els:
i_s = els[0].appendChild(self._doc.createElement("input_source"))
i_s.setAttribute("type", input_source)
self._write_session_xml()
@etime('seslog_dialogue_rec_start')
# @catch_ioerror - do not add! VIO catches the IOError
def _dialogue_rec_start(self, speaker, fname):
""" Adds the optional recorded input/output element to the last
"speaker" turn.
FIXME: It can happen that the session.xml is not created when this
function is called.
"""
els = self._doc.getElementsByTagName("dialogue")
if els:
da = els[0].appendChild(self._doc.createElement("dialogue_rec"))
if speaker:
da.setAttribute("speaker", speaker)
da.setAttribute("fname", fname)
da.setAttribute("starttime", self._get_time_str())
else:
self._write_session_xml()
raise SessionLoggerException(("Missing dialogue element for %s speaker") % speaker)
self._write_session_xml()
@etime('seslog_dialogue_rec_end')
# @catch_ioerror - do not add! VIO catches the IOError
def _dialogue_rec_end(self, fname):
""" Stores the end time in the dialogue_rec element with fname file.
"""
els = self._doc.getElementsByTagName("dialogue_rec")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("fname") == fname:
els[i].setAttribute("endtime", self._get_time_str())
break
else:
self._write_session_xml()
raise SessionLoggerException("Missing dialogue_rec element for %s fname" % fname)
self._write_session_xml()
@etime('seslog_evaluation')
@catch_ioerror
def _evaluation(self, num_turns, task_success, user_sat, score):
"""Adds the evaluation optional tag to the header."""
raise SessionLoggerException("Not implemented")
def _turn_count(self, speaker):
trns = self._doc.getElementsByTagName("turn")
counter = 0
if trns:
for i in range(trns.length):
if trns[i].getAttribute("speaker") == speaker:
counter += 1
return counter
return 0
@etime('seslog_turn')
@catch_ioerror
def _turn(self, speaker):
""" Adds a new turn at the end of the dialogue element.
The turn_number for the speaker is automatically computed.
"""
els = self._doc.getElementsByTagName("dialogue")
turn_number = self._turn_count(speaker) + 1
if els:
turn = els[0].appendChild(self._doc.createElement("turn"))
turn.setAttribute("speaker", speaker)
turn.setAttribute("turn_number", unicode(turn_number))
turn.setAttribute("time", self._get_time_str())
self._write_session_xml()
@etime('seslog_dialogue_act')
@catch_ioerror
def _dialogue_act(self, speaker, dialogue_act):
""" Adds the dialogue_act element to the last "speaker" turn.
"""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker:
da = els[i].appendChild(self._doc.createElement("dialogue_act"))
da.setAttribute("time", self._get_time_str())
da.appendChild(self._doc.createTextNode(unicode(dialogue_act)))
break
else:
self._write_session_xml()
raise SessionLoggerException(("Missing turn element for %s speaker") % speaker)
self._write_session_xml()
@etime('seslog_text')
@catch_ioerror
def _text(self, speaker, text, cost=None):
""" Adds the text (prompt) element to the last "speaker" turn.
"""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker:
da = els[i].appendChild(self._doc.createElement("text"))
da.setAttribute("time", self._get_time_str())
if cost:
da.setAttribute("cost", unicode(cost))
da.appendChild(self._doc.createTextNode(unicode(text)))
break
else:
self._write_session_xml()
raise SessionLoggerException("Missing turn element for {spkr} speaker".format(spkr=speaker))
self._write_session_xml()
@etime('seslog_rec_start')
@catch_ioerror
def _rec_start(self, speaker, fname):
"""Adds the optional recorded input/output element to the last
"speaker" turn.
"""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker:
da = els[i].appendChild(self._doc.createElement("rec"))
da.setAttribute("fname", fname)
da.setAttribute("starttime", self._get_time_str())
break
else:
self._write_session_xml()
raise SessionLoggerException(("Missing turn element for the {spkr} speaker".format(spkr=speaker)))
self._write_session_xml()
self._rec_started[fname] = wave.open(os.path.join(self._session_dir_name, fname), 'w')
self._rec_started[fname].setnchannels(1)
self._rec_started[fname].setsampwidth(2)
self._rec_started[fname].setframerate(self.cfg['Audio']['sample_rate'])
@etime('seslog_rec_write')
@catch_ioerror
def _rec_write(self, fname, data_rec):
"""Write into open file recording.
"""
try:
self._rec_started[fname].writeframes(bytearray(data_rec))
except KeyError:
raise SessionLoggerException("rec_write: missing rec element %s" % fname)
@etime('seslog_rec_end')
@catch_ioerror
def _rec_end(self, fname):
""" Stores the end time in the rec element with fname file.
"""
try:
els = self._doc.getElementsByTagName("rec")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("fname") == fname:
els[i].setAttribute("endtime", self._get_time_str())
break
else:
raise SessionLoggerException(("Missing rec element for the {fname} fname.".format(fname=fname)))
self._write_session_xml()
self._rec_started[fname].close()
self._rec_started[fname] = None
except KeyError:
raise SessionLoggerException("rec_end: missing rec element %s" % fname)
def _include_rec(self, turn, fname):
if fname == "*":
return True
recs = turn.getElementsByTagName("rec")
for rec in recs:
if rec.getAttribute("fname") == fname:
return True
return False
@etime('seslog_asr')
@catch_ioerror
def _asr(self, speaker, fname, nblist, confnet=None):
""" Adds the ASR nblist to the last speaker turn.
alex Extension: It can also store the confusion network representation.
"""
els = self._doc.getElementsByTagName("turn")
for el_idx in range(els.length - 1, -1, -1):
if els[el_idx].getAttribute("speaker") == speaker and self._include_rec(els[el_idx], fname):
asr = els[el_idx].appendChild(self._doc.createElement("asr"))
for prob, hyp in nblist:
hyp_el = asr.appendChild(self._doc.createElement("hypothesis"))
hyp_el.setAttribute("p", "{0:.3f}".format(prob))
hyp_el.appendChild(self._doc.createTextNode(unicode(hyp)))
if confnet:
cn = asr.appendChild(self._doc.createElement("confnet"))
for alts in confnet:
was = cn.appendChild(
self._doc.createElement("word_alternatives"))
for prob, word in alts:
wa = was.appendChild(self._doc.createElement("word"))
wa.setAttribute("p", "{0:.3f}".format(prob))
wa.appendChild(self._doc.createTextNode(unicode(word)))
break
else:
self._write_session_xml()
raise SessionLoggerException(("Missing turn element for %s speaker") % speaker)
self._write_session_xml()
@etime('seslog_slu')
@catch_ioerror
def _slu(self, speaker, fname, nblist, confnet=None):
""" Adds the slu nbest list to the last speaker turn.
alex Extension: It can also store the confusion network representation.
The confnet must be an instance of DialogueActConfusionNetwork.
"""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker and self._include_rec(els[i], fname):
asr = els[i].appendChild(self._doc.createElement("slu"))
for p, h in nblist:
hyp = asr.appendChild(self._doc.createElement("interpretation"))
hyp.setAttribute("p", "%.3f" % p)
hyp.appendChild(self._doc.createTextNode(unicode(h)))
if confnet:
cn = asr.appendChild(self._doc.createElement("confnet"))
for p, dai in confnet:
sas = cn.appendChild(self._doc.createElement("dai_alternatives"))
daia = sas.appendChild(self._doc.createElement("dai"))
daia.setAttribute("p", "%.3f" % p)
daia.appendChild(self._doc.createTextNode(unicode(dai)))
daia = sas.appendChild(self._doc.createElement("dai"))
daia.setAttribute("p", "%.3f" % (1 - p))
daia.appendChild(self._doc.createTextNode("null()"))
break
else:
self._write_session_xml()
raise SessionLoggerException(("Missing turn element for %s speaker") % speaker)
self._write_session_xml()
@etime('seslog_barge_in')
@catch_ioerror
def _barge_in(self, speaker, tts_time=False, asr_time=False):
"""Add the optional barge-in element to the last speaker turn."""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker:
da = els[i].appendChild(self._doc.createElement("barge-in"))
da.setAttribute("time", self._get_time_str())
if tts_time:
da.setAttribute("tts_time", self._get_time_str())
if asr_time:
da.setAttribute("asr_time", self._get_time_str())
break
else:
raise SessionLoggerException(("Missing turn element for %s speaker") % speaker)
self._write_session_xml()
@etime('seslog_hangup')
@catch_ioerror
def _hangup(self, speaker):
""" Adds the user hangup element to the last user turn.
"""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker:
els[i].appendChild(self._doc.createElement("hangup"))
break
else:
self._write_session_xml()
raise SessionLoggerException(("Missing turn element for %s speaker") % speaker)
self._write_session_xml()
########################################################################
## The following functions define functionality above what was set in ##
## SDC 2010 XML logging format. ##
########################################################################
def _last_turn_element(self, speaker):
""" Finds the XML element in the given open XML session
which corresponds to the last turn for the given speaker.
Closes the XML and throws an exception if the element cannot be found.
"""
els = self._doc.getElementsByTagName("turn")
for i in range(els.length - 1, -1, -1):
if els[i].getAttribute("speaker") == speaker:
return els[i]
else:
self._write_session_xml()
raise SessionLoggerException(("Missing turn element for %s speaker") % speaker)
@etime('seslog_dialogue_state')
@catch_ioerror
def _dialogue_state(self, speaker, dstate):
""" Adds the dialogue state to the log.
This is an alex extension.
The dstate has the following structure:
[state1, state2, ...]
where state* has the following structure
[ (slot_name1, slot_value1), (slot_name2, slot_value2), ...)
"""
turn = self._last_turn_element(speaker)
for state in dstate:
ds = turn.appendChild(self._doc.createElement("dialogue_state"))
for slot_name, slot_value in state:
sl = ds.appendChild(self._doc.createElement("slot"))
sl.setAttribute("name", "%s" % slot_name)
sl.appendChild(self._doc.createTextNode(unicode(slot_value)))
self._write_session_xml()
@etime('seslog_external_data_file')
@catch_ioerror
def _external_data_file(self, ftype, fname, data=None):
"""Writes data to an external file and adds a link to the log.
This will create an <external> link with appropriate "type" and "fname"
attributes. If the data is None, no file is created, just the link.
This is an alex extension.
"""
# create the file link
turn = self._last_turn_element("system")
el = turn.appendChild(self._doc.createElement("external"))
el.setAttribute("type", ftype)
el.setAttribute("fname", os.path.basename(fname))
self._write_session_xml()
# write the file data
if data is not None:
with open(fname, 'w') as fh:
fh.write(data)
[docs] def run(self):
try:
set_proc_name("Alex_SessionLogger")
last_session_start_time = 0
last_session_end_time = 0
while 1:
# Check the close event.
if self.close_event.is_set():
print 'Received close event in: %s' % multiprocessing.current_process().name
return
time.sleep(self.cfg['Hub']['main_loop_sleep_time'])
s = (time.time(), time.clock())
while not self.queue.empty():
self._queue.append(self.queue.get())
if len(self._queue):
cmd, args, kw, cmd_time = self._queue.popleft()
attr = '_'+cmd
try:
if cmd == 'session_start':
last_session_start_time = time.time()
elif cmd == 'session_end':
last_session_start_time = time.time()
if not self._is_open and cmd != 'session_start':
session_start_found = False
while time.time() - cmd_time < 3.0 and not session_start_found:
# these are probably commands for the new un-opened session
for i, (_cmd, _args, _kw, _cmd_time) in enumerate(self._queue):
if _cmd == 'session_start':
print "SessionLogger: finally found session start"
self._session_start(*_args,**_kw)
del self._queue[i]
session_start_found = True
break
else:
time.sleep(self.cfg['Hub']['main_loop_sleep_time'])
if not session_start_found and (last_session_end_time - cmd_time < 2.0):
# just silently ignore because these are likely the be commands for the already
# closed session
# print "SessionLogger: should be silent"
# print "SessionLogger: calling method", cmd, "when the session is not open"
# print ' ', [a for a in args if isinstance(a, basestring) and len(a) < 80]
continue
if not session_start_found:
print "SessionLogger: no session start found"
print "SessionLogger: calling method", cmd, "when the session is not open"
print ' ', [a for a in args if isinstance(a, basestring) and len(a) < 80]
continue
cf = SessionLogger.__dict__[attr]
cf(self, *args, **kw)
except AttributeError:
print "SessionLogger: unknown method", cmd
self.close_event.set()
raise
except SessionLoggerException as e:
if cmd == 'rec_write':
print "Exception when logging:", cmd
print e
else:
print "Exception when logging:", cmd, args, kw
print e
except SessionClosedException:
print "Exception when logging:", cmd, args, kw
print e
d = (time.time() - s[0], time.clock() - s[1])
if d[0] > 0.200:
print "EXEC Time inner loop: SessionLogger t = {t:0.4f} c = {c:0.4f}\n".format(t=d[0], c=d[1])
except KeyboardInterrupt:
print 'KeyboardInterrupt exception in: %s' % multiprocessing.current_process().name
self.close_event.set()
return
except:
print 'Uncaught exception in the SessionLogger process.'
self.close_event.set()
raise
print 'Exiting: %s. Setting close event' % multiprocessing.current_process().name
self.close_event.set()